hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1371518 [4/6] - in /hadoop/common/branches/HDFS-3077/hadoop-hdfs-project: hadoop-hdfs-httpfs/dev-support/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/...
Date Thu, 09 Aug 2012 22:30:33 GMT
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Thu Aug  9 22:29:36 2012
@@ -23,7 +23,6 @@ import java.io.BufferedOutputStream;
 import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.EOFException;
 import java.io.FileDescriptor;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -34,12 +33,14 @@ import java.util.LinkedList;
 import java.util.zip.Checksum;
 
 import org.apache.commons.logging.Log;
+import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSOutputSummer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
@@ -77,9 +78,10 @@ class BlockReceiver implements Closeable
   private DataOutputStream checksumOut = null; // to crc file at local disk
   private int bytesPerChecksum;
   private int checksumSize;
-  private ByteBuffer buf; // contains one full packet.
-  private int bufRead; //amount of valid data in the buf
-  private int maxPacketReadLen;
+  
+  private PacketReceiver packetReceiver =
+      new PacketReceiver(false);
+  
   protected final String inAddr;
   protected final String myAddr;
   private String mirrorAddr;
@@ -248,6 +250,10 @@ class BlockReceiver implements Closeable
    */
   @Override
   public void close() throws IOException {
+    if (packetReceiver != null) {
+      packetReceiver.close();
+    }
+    
     IOException ioe = null;
     if (syncOnClose && (out != null || checksumOut != null)) {
       datanode.metrics.incrFsyncCount();      
@@ -365,33 +371,24 @@ class BlockReceiver implements Closeable
   /**
    * Verify multiple CRC chunks. 
    */
-  private void verifyChunks( byte[] dataBuf, int dataOff, int len, 
-                             byte[] checksumBuf, int checksumOff ) 
-                             throws IOException {
-    while (len > 0) {
-      int chunkLen = Math.min(len, bytesPerChecksum);
-      
-      clientChecksum.update(dataBuf, dataOff, chunkLen);
-
-      if (!clientChecksum.compare(checksumBuf, checksumOff)) {
-        if (srcDataNode != null) {
-          try {
-            LOG.info("report corrupt block " + block + " from datanode " +
-                      srcDataNode + " to namenode");
-            datanode.reportRemoteBadBlock(srcDataNode, block);
-          } catch (IOException e) {
-            LOG.warn("Failed to report bad block " + block + 
-                      " from datanode " + srcDataNode + " to namenode");
-          }
+  private void verifyChunks(ByteBuffer dataBuf, ByteBuffer checksumBuf)
+      throws IOException {
+    try {
+      clientChecksum.verifyChunkedSums(dataBuf, checksumBuf, clientname, 0);
+    } catch (ChecksumException ce) {
+      LOG.warn("Checksum error in block " + block + " from " + inAddr, ce);
+      if (srcDataNode != null) {
+        try {
+          LOG.info("report corrupt block " + block + " from datanode " +
+                    srcDataNode + " to namenode");
+          datanode.reportRemoteBadBlock(srcDataNode, block);
+        } catch (IOException e) {
+          LOG.warn("Failed to report bad block " + block + 
+                    " from datanode " + srcDataNode + " to namenode");
         }
-        throw new IOException("Unexpected checksum mismatch " + 
-                              "while writing " + block + " from " + inAddr);
       }
-
-      clientChecksum.reset();
-      dataOff += chunkLen;
-      checksumOff += checksumSize;
-      len -= chunkLen;
+      throw new IOException("Unexpected checksum mismatch " + 
+                            "while writing " + block + " from " + inAddr);
     }
   }
   
@@ -403,163 +400,24 @@ class BlockReceiver implements Closeable
    * This does not verify the original checksums, under the assumption
    * that they have already been validated.
    */
-  private void translateChunks( byte[] dataBuf, int dataOff, int len,
-      byte[] checksumBuf, int checksumOff ) {
-    if (len == 0) return;
-    
-    int numChunks = (len - 1)/bytesPerChecksum + 1;
-    
-    diskChecksum.calculateChunkedSums(
-        ByteBuffer.wrap(dataBuf, dataOff, len),
-        ByteBuffer.wrap(checksumBuf, checksumOff, numChunks * checksumSize));
+  private void translateChunks(ByteBuffer dataBuf, ByteBuffer checksumBuf) {
+    diskChecksum.calculateChunkedSums(dataBuf, checksumBuf);
   }
 
-  /**
-   * Makes sure buf.position() is zero without modifying buf.remaining().
-   * It moves the data if position needs to be changed.
-   */
-  private void shiftBufData() {
-    if (bufRead != buf.limit()) {
-      throw new IllegalStateException("bufRead should be same as " +
-                                      "buf.limit()");
-    }
-    
-    //shift the remaining data on buf to the front
-    if (buf.position() > 0) {
-      int dataLeft = buf.remaining();
-      if (dataLeft > 0) {
-        byte[] b = buf.array();
-        System.arraycopy(b, buf.position(), b, 0, dataLeft);
-      }
-      buf.position(0);
-      bufRead = dataLeft;
-      buf.limit(bufRead);
-    }
-  }
-  
-  /**
-   * reads upto toRead byte to buf at buf.limit() and increments the limit.
-   * throws an IOException if read does not succeed.
-   */
-  private int readToBuf(int toRead) throws IOException {
-    if (toRead < 0) {
-      toRead = (maxPacketReadLen > 0 ? maxPacketReadLen : buf.capacity())
-               - buf.limit();
-    }
-    
-    int nRead = in.read(buf.array(), buf.limit(), toRead);
-    
-    if (nRead < 0) {
-      throw new EOFException("while trying to read " + toRead + " bytes");
-    }
-    bufRead = buf.limit() + nRead;
-    buf.limit(bufRead);
-    return nRead;
-  }
-  
-  
-  /**
-   * Reads (at least) one packet and returns the packet length.
-   * buf.position() points to the start of the packet and 
-   * buf.limit() point to the end of the packet. There could 
-   * be more data from next packet in buf.<br><br>
-   * 
-   * It tries to read a full packet with single read call.
-   * Consecutive packets are usually of the same length.
-   */
-  private void readNextPacket() throws IOException {
-    /* This dances around buf a little bit, mainly to read 
-     * full packet with single read and to accept arbitrary size  
-     * for next packet at the same time.
-     */
-    if (buf == null) {
-      /* initialize buffer to the best guess size:
-       * 'chunksPerPacket' calculation here should match the same 
-       * calculation in DFSClient to make the guess accurate.
-       */
-      int chunkSize = bytesPerChecksum + checksumSize;
-      int chunksPerPacket = (datanode.getDnConf().writePacketSize - PacketHeader.PKT_HEADER_LEN
-                             + chunkSize - 1)/chunkSize;
-      buf = ByteBuffer.allocate(PacketHeader.PKT_HEADER_LEN +
-                                Math.max(chunksPerPacket, 1) * chunkSize);
-      buf.limit(0);
-    }
-    
-    // See if there is data left in the buffer :
-    if (bufRead > buf.limit()) {
-      buf.limit(bufRead);
-    }
-    
-    while (buf.remaining() < HdfsConstants.BYTES_IN_INTEGER) {
-      if (buf.position() > 0) {
-        shiftBufData();
-      }
-      readToBuf(-1);
-    }
-    
-    /* We mostly have the full packet or at least enough for an int
-     */
-    buf.mark();
-    int payloadLen = buf.getInt();
-    buf.reset();
-    
-    // check corrupt values for pktLen, 100MB upper limit should be ok?
-    if (payloadLen < 0 || payloadLen > (100*1024*1024)) {
-      throw new IOException("Incorrect value for packet payload : " +
-                            payloadLen);
-    }
-    
-    // Subtract BYTES_IN_INTEGER since that accounts for the payloadLen that
-    // we read above.
-    int pktSize = payloadLen + PacketHeader.PKT_HEADER_LEN
-        - HdfsConstants.BYTES_IN_INTEGER;
-    
-    if (buf.remaining() < pktSize) {
-      //we need to read more data
-      int toRead = pktSize - buf.remaining();
-      
-      // first make sure buf has enough space.        
-      int spaceLeft = buf.capacity() - buf.limit();
-      if (toRead > spaceLeft && buf.position() > 0) {
-        shiftBufData();
-        spaceLeft = buf.capacity() - buf.limit();
-      }
-      if (toRead > spaceLeft) {
-        byte oldBuf[] = buf.array();
-        int toCopy = buf.limit();
-        buf = ByteBuffer.allocate(toCopy + toRead);
-        System.arraycopy(oldBuf, 0, buf.array(), 0, toCopy);
-        buf.limit(toCopy);
-      }
-      
-      //now read:
-      while (toRead > 0) {
-        toRead -= readToBuf(toRead);
-      }
-    }
-    
-    if (buf.remaining() > pktSize) {
-      buf.limit(buf.position() + pktSize);
-    }
-    
-    if (pktSize > maxPacketReadLen) {
-      maxPacketReadLen = pktSize;
-    }
-  }
-  
+
   /** 
    * Receives and processes a packet. It can contain many chunks.
    * returns the number of data bytes that the packet has.
    */
   private int receivePacket() throws IOException {
     // read the next packet
-    readNextPacket();
+    packetReceiver.receiveNextPacket(in);
 
-    buf.mark();
-    PacketHeader header = new PacketHeader();
-    header.readFields(buf);
-    int endOfHeader = buf.position();
-    buf.reset();
+    PacketHeader header = packetReceiver.getHeader();
+    if (LOG.isDebugEnabled()){
+      LOG.debug("Receiving one packet for block " + block +
+                ": " + header);
+    }
 
     // Sanity check the header
     if (header.getOffsetInBlock() > replicaInfo.getNumBytes()) {
@@ -574,38 +432,12 @@ class BlockReceiver implements Closeable
                             header.getDataLen()); 
     }
 
-    return receivePacket(
-      header.getOffsetInBlock(),
-      header.getSeqno(),
-      header.isLastPacketInBlock(),
-      header.getDataLen(),
-      header.getSyncBlock(),
-      endOfHeader);
-  }
+    long offsetInBlock = header.getOffsetInBlock();
+    long seqno = header.getSeqno();
+    boolean lastPacketInBlock = header.isLastPacketInBlock();
+    int len = header.getDataLen();
+    boolean syncBlock = header.getSyncBlock();
 
-  /**
-   * Write the received packet to disk (data only)
-   */
-  private void writePacketToDisk(byte[] pktBuf, int startByteToDisk, 
-      int numBytesToDisk) throws IOException {
-    out.write(pktBuf, startByteToDisk, numBytesToDisk);
-  }
-  
-  /** 
-   * Receives and processes a packet. It can contain many chunks.
-   * returns the number of data bytes that the packet has.
-   */
-  private int receivePacket(long offsetInBlock, long seqno,
-      boolean lastPacketInBlock, int len, boolean syncBlock,
-      int endOfHeader) throws IOException {
-    if (LOG.isDebugEnabled()){
-      LOG.debug("Receiving one packet for block " + block +
-                " of length " + len +
-                " seqno " + seqno +
-                " offsetInBlock " + offsetInBlock +
-                " syncBlock " + syncBlock +
-                " lastPacketInBlock " + lastPacketInBlock);
-    }
     // make sure the block gets sync'ed upon close
     this.syncOnClose |= syncBlock && lastPacketInBlock;
 
@@ -625,14 +457,15 @@ class BlockReceiver implements Closeable
     //First write the packet to the mirror:
     if (mirrorOut != null && !mirrorError) {
       try {
-        mirrorOut.write(buf.array(), buf.position(), buf.remaining());
+        packetReceiver.mirrorPacketTo(mirrorOut);
         mirrorOut.flush();
       } catch (IOException e) {
         handleMirrorOutError(e);
       }
     }
     
-    buf.position(endOfHeader);        
+    ByteBuffer dataBuf = packetReceiver.getDataSlice();
+    ByteBuffer checksumBuf = packetReceiver.getChecksumSlice();
     
     if (lastPacketInBlock || len == 0) {
       if(LOG.isDebugEnabled()) {
@@ -646,18 +479,11 @@ class BlockReceiver implements Closeable
       int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
                                                             checksumSize;
 
-      if ( buf.remaining() != (checksumLen + len)) {
-        throw new IOException("Data remaining in packet does not match" +
-                              "sum of checksumLen and dataLen " +
-                              " size remaining: " + buf.remaining() +
-                              " data len: " + len +
-                              " checksum Len: " + checksumLen);
-      }
-      int checksumOff = buf.position();
-      int dataOff = checksumOff + checksumLen;
-      byte pktBuf[] = buf.array();
-
-      buf.position(buf.limit()); // move to the end of the data.
+      if ( checksumBuf.capacity() != checksumLen) {
+        throw new IOException("Length of checksums in packet " +
+            checksumBuf.capacity() + " does not match calculated checksum " +
+            "length " + checksumLen);
+     }
 
       /* skip verifying checksum iff this is not the last one in the 
        * pipeline and clientName is non-null. i.e. Checksum is verified
@@ -667,11 +493,11 @@ class BlockReceiver implements Closeable
        * checksum.
        */
       if (mirrorOut == null || isDatanode || needsChecksumTranslation) {
-        verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
+        verifyChunks(dataBuf, checksumBuf);
         if (needsChecksumTranslation) {
           // overwrite the checksums in the packet buffer with the
           // appropriate polynomial for the disk storage.
-          translateChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
+          translateChunks(dataBuf, checksumBuf);
         }
       }
       
@@ -700,9 +526,13 @@ class BlockReceiver implements Closeable
             computePartialChunkCrc(onDiskLen, offsetInChecksum, bytesPerChecksum);
           }
 
-          int startByteToDisk = dataOff+(int)(onDiskLen-firstByteInBlock);
+          int startByteToDisk = (int)(onDiskLen-firstByteInBlock) 
+              + dataBuf.arrayOffset() + dataBuf.position();
+
           int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
-          writePacketToDisk(pktBuf, startByteToDisk, numBytesToDisk);
+          
+          // Write data to disk.
+          out.write(dataBuf.array(), startByteToDisk, numBytesToDisk);
 
           // If this is a partial chunk, then verify that this is the only
           // chunk in the packet. Calculate new crc for this chunk.
@@ -714,7 +544,7 @@ class BlockReceiver implements Closeable
                                     " len = " + len + 
                                     " bytesPerChecksum " + bytesPerChecksum);
             }
-            partialCrc.update(pktBuf, startByteToDisk, numBytesToDisk);
+            partialCrc.update(dataBuf.array(), startByteToDisk, numBytesToDisk);
             byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
             lastChunkChecksum = Arrays.copyOfRange(
               buf, buf.length - checksumSize, buf.length
@@ -726,11 +556,12 @@ class BlockReceiver implements Closeable
             partialCrc = null;
           } else {
             lastChunkChecksum = Arrays.copyOfRange(
-              pktBuf, 
-              checksumOff + checksumLen - checksumSize, 
-              checksumOff + checksumLen
-            );
-            checksumOut.write(pktBuf, checksumOff, checksumLen);
+                checksumBuf.array(),
+                checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen - checksumSize,
+                checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen);
+            checksumOut.write(checksumBuf.array(),
+                checksumBuf.arrayOffset() + checksumBuf.position(),
+                checksumLen);
           }
           /// flush entire packet, sync unless close() will sync
           flushOrSync(syncBlock && !lastPacketInBlock);

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Thu Aug  9 22:29:36 2012
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.da
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.EOFException;
 import java.io.FileDescriptor;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
@@ -38,7 +37,6 @@ import org.apache.hadoop.fs.ChecksumExce
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
-import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.LongWritable;
@@ -64,40 +62,29 @@ import org.apache.hadoop.util.DataChecks
  * </pre>   
  * An empty packet is sent to mark the end of block and read completion.
  * 
- *  PACKET Contains a packet header, checksum and data. Amount of data
- *  carried is set by BUFFER_SIZE.
- *  <pre>
- *    +-----------------------------------------------------+
- *    | 4 byte packet length (excluding packet header)      |
- *    +-----------------------------------------------------+
- *    | 8 byte offset in the block | 8 byte sequence number |
- *    +-----------------------------------------------------+
- *    | 1 byte isLastPacketInBlock                          |
- *    +-----------------------------------------------------+
- *    | 4 byte Length of actual data                        |
- *    +-----------------------------------------------------+
- *    | x byte checksum data. x is defined below            |
- *    +-----------------------------------------------------+
- *    | actual data ......                                  |
- *    +-----------------------------------------------------+
- *    
- *    Data is made of Chunks. Each chunk is of length <= BYTES_PER_CHECKSUM.
- *    A checksum is calculated for each chunk.
- *    
- *    x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM *
- *        CHECKSUM_SIZE
- *        
- *    CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32) 
- *    </pre>
+ * PACKET Contains a packet header, checksum and data. Amount of data
+ * carried is set by BUFFER_SIZE.
+ * <pre>
+ *   +-----------------------------------------------------+
+ *   | Variable length header. See {@link PacketHeader}    |
+ *   +-----------------------------------------------------+
+ *   | x byte checksum data. x is defined below            |
+ *   +-----------------------------------------------------+
+ *   | actual data ......                                  |
+ *   +-----------------------------------------------------+
+ * 
+ *   Data is made of Chunks. Each chunk is of length <= BYTES_PER_CHECKSUM.
+ *   A checksum is calculated for each chunk.
+ *  
+ *   x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM *
+ *       CHECKSUM_SIZE
+ *  
+ *   CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32) 
+ *  </pre>
  *  
  *  The client reads data until it receives a packet with 
  *  "LastPacketInBlock" set to true or with a zero length. If there is 
- *  no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK: 
- *  <pre>
- *    +------------------------------+
- *    | 2 byte OP_STATUS_CHECKSUM_OK |
- *    +------------------------------+
- *  </pre>
+ *  no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK.
  */
 class BlockSender implements java.io.Closeable {
   static final Log LOG = DataNode.LOG;
@@ -163,8 +150,6 @@ class BlockSender implements java.io.Clo
    */
   private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024;
   
-  private static ReadaheadPool readaheadPool =
-    ReadaheadPool.getInstance();
 
   /**
    * Constructor
@@ -452,8 +437,22 @@ class BlockSender implements java.io.Clo
     int packetLen = dataLen + checksumDataLen + 4;
     boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;
 
-    writePacketHeader(pkt, dataLen, packetLen);
-
+    // The packet buffer is organized as follows:
+    // _______HHHHCCCCD?D?D?D?
+    //        ^   ^
+    //        |   \ checksumOff
+    //        \ headerOff
+    // _ padding, since the header is variable-length
+    // H = header and length prefixes
+    // C = checksums
+    // D? = data, if transferTo is false.
+    
+    int headerLen = writePacketHeader(pkt, dataLen, packetLen);
+    
+    // Per above, the header doesn't start at the beginning of the
+    // buffer
+    int headerOff = pkt.position() - headerLen;
+    
     int checksumOff = pkt.position();
     byte[] buf = pkt.array();
     
@@ -483,7 +482,8 @@ class BlockSender implements java.io.Clo
     try {
       if (transferTo) {
         SocketOutputStream sockOut = (SocketOutputStream)out;
-        sockOut.write(buf, 0, dataOff); // First write checksum
+        // First write header and checksums
+        sockOut.write(buf, headerOff, dataOff - headerOff);
         
         // no need to flush since we know out is not a buffered stream
         FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
@@ -496,7 +496,7 @@ class BlockSender implements java.io.Clo
         blockInPosition += dataLen;
       } else {
         // normal transfer
-        out.write(buf, 0, dataOff + dataLen);
+        out.write(buf, headerOff, dataOff + dataLen - headerOff);
       }
     } catch (IOException e) {
       if (e instanceof SocketTimeoutException) {
@@ -629,7 +629,7 @@ class BlockSender implements java.io.Clo
     final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
     try {
       int maxChunksPerPacket;
-      int pktSize = PacketHeader.PKT_HEADER_LEN;
+      int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;
       boolean transferTo = transferToAllowed && !verifyChecksum
           && baseStream instanceof SocketOutputStream
           && blockIn instanceof FileInputStream;
@@ -640,15 +640,15 @@ class BlockSender implements java.io.Clo
         maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);
         
         // Smaller packet size to only hold checksum when doing transferTo
-        pktSize += checksumSize * maxChunksPerPacket;
+        pktBufSize += checksumSize * maxChunksPerPacket;
       } else {
         maxChunksPerPacket = Math.max(1,
             numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE));
         // Packet size includes both checksum and data
-        pktSize += (chunkSize + checksumSize) * maxChunksPerPacket;
+        pktBufSize += (chunkSize + checksumSize) * maxChunksPerPacket;
       }
 
-      ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
+      ByteBuffer pktBuf = ByteBuffer.allocate(pktBufSize);
 
       while (endOffset > offset) {
         manageOsCache();
@@ -691,8 +691,8 @@ class BlockSender implements java.io.Clo
     }
 
     // Perform readahead if necessary
-    if (readaheadLength > 0 && readaheadPool != null) {
-      curReadahead = readaheadPool.readaheadStream(
+    if (readaheadLength > 0 && datanode.readaheadPool != null) {
+      curReadahead = datanode.readaheadPool.readaheadStream(
           clientTraceFmt, blockInFd,
           offset, readaheadLength, Long.MAX_VALUE,
           curReadahead);
@@ -718,14 +718,19 @@ class BlockSender implements java.io.Clo
   }
 
   /**
-   * Write packet header into {@code pkt}
+   * Write packet header into {@code pkt},
+   * return the length of the header written.
    */
-  private void writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
+  private int writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
     pkt.clear();
     // both syncBlock and syncPacket are false
     PacketHeader header = new PacketHeader(packetLen, offset, seqno,
         (dataLen == 0), dataLen, false);
+    
+    int size = header.getSerializedSize();
+    pkt.position(PacketHeader.PKT_MAX_HEADER_LEN - size);
     header.putInBuffer(pkt);
+    return size;
   }
   
   boolean didSendEntireByteRange() {

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java Thu Aug  9 22:29:36 2012
@@ -33,7 +33,9 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT;
-
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@@ -52,6 +54,7 @@ class DNConf {
   final boolean syncBehindWrites;
   final boolean dropCacheBehindReads;
   final boolean syncOnClose;
+  final boolean encryptDataTransfer;
   
 
   final long readaheadLength;
@@ -62,6 +65,7 @@ class DNConf {
   final int writePacketSize;
   
   final String minimumNameNodeVersion;
+  final String encryptionAlgorithm;
 
   public DNConf(Configuration conf) {
     socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
@@ -117,6 +121,10 @@ class DNConf {
 
     this.minimumNameNodeVersion = conf.get(DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY,
         DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT);
+    
+    this.encryptDataTransfer = conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY,
+        DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
+    this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
   }
   
   // We get minimumNameNodeVersion via a method so it can be mocked out in tests.

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Thu Aug  9 22:29:36 2012
@@ -53,6 +53,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
@@ -98,6 +99,8 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
@@ -141,6 +144,7 @@ import org.apache.hadoop.hdfs.web.WebHdf
 import org.apache.hadoop.hdfs.web.resources.Param;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.ReadaheadPool;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
@@ -272,6 +276,7 @@ public class DataNode extends Configured
   private Configuration conf;
 
   private final String userWithLocalPathAccess;
+  ReadaheadPool readaheadPool;
 
   /**
    * Create the DataNode given a configuration and an array of dataDirs.
@@ -666,6 +671,10 @@ public class DataNode extends Configured
 
     blockPoolManager = new BlockPoolManager(this);
     blockPoolManager.refreshNamenodes(conf);
+
+    // Create the ReadaheadPool from the DataNode context so we can
+    // exit without having to explicitly shutdown its thread pool.
+    readaheadPool = ReadaheadPool.getInstance();
   }
   
   /**
@@ -733,8 +742,6 @@ public class DataNode extends Configured
             + " tokens, or none may be.");
       }
     }
-    // TODO should we check that all federated nns are either enabled or
-    // disabled?
     if (!isBlockTokenEnabled) return;
     
     if (!blockPoolTokenSecretManager.isBlockPoolRegistered(blockPoolId)) {
@@ -746,7 +753,8 @@ public class DataNode extends Configured
           + " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000)
           + " min(s)");
       final BlockTokenSecretManager secretMgr = 
-        new BlockTokenSecretManager(0, blockTokenLifetime);
+          new BlockTokenSecretManager(0, blockTokenLifetime, blockPoolId,
+              dnConf.encryptionAlgorithm);
       blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr);
     }
   }
@@ -1386,9 +1394,21 @@ public class DataNode extends Configured
 
         long writeTimeout = dnConf.socketWriteTimeout + 
                             HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
-        OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout);
-        out = new DataOutputStream(new BufferedOutputStream(baseStream,
+        OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
+        InputStream unbufIn = NetUtils.getInputStream(sock);
+        if (dnConf.encryptDataTransfer) {
+          IOStreamPair encryptedStreams =
+              DataTransferEncryptor.getEncryptedStreams(
+                  unbufOut, unbufIn,
+                  blockPoolTokenSecretManager.generateDataEncryptionKey(
+                      b.getBlockPoolId()));
+          unbufOut = encryptedStreams.out;
+          unbufIn = encryptedStreams.in;
+        }
+        
+        out = new DataOutputStream(new BufferedOutputStream(unbufOut,
             HdfsConstants.SMALL_BUFFER_SIZE));
+        in = new DataInputStream(unbufIn);
         blockSender = new BlockSender(b, 0, b.getNumBytes(), 
             false, false, DataNode.this, null);
         DatanodeInfo srcNode = new DatanodeInfo(bpReg);
@@ -1406,7 +1426,7 @@ public class DataNode extends Configured
             stage, 0, 0, 0, 0, blockSender.getChecksum());
 
         // send data & checksum
-        blockSender.sendBlock(out, baseStream, null);
+        blockSender.sendBlock(out, unbufOut, null);
 
         // no response necessary
         LOG.info(getClass().getSimpleName() + ": Transmitted " + b
@@ -1414,7 +1434,6 @@ public class DataNode extends Configured
 
         // read ack
         if (isClient) {
-          in = new DataInputStream(NetUtils.getInputStream(sock));
           DNTransferAckProto closeAck = DNTransferAckProto.parseFrom(
               HdfsProtoUtil.vintPrefixed(in));
           if (LOG.isDebugEnabled()) {

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Thu Aug  9 22:29:36 2012
@@ -29,6 +29,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InterruptedIOException;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
@@ -43,7 +44,10 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor.InvalidMagicNumberException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -84,7 +88,8 @@ class DataXceiver extends Receiver imple
   private final DataXceiverServer dataXceiverServer;
 
   private long opStartTime; //the start time of receiving an Op
-  private final SocketInputWrapper socketInputWrapper;
+  private final SocketInputWrapper socketIn;
+  private OutputStream socketOut;
 
   /**
    * Client Name used in previous operation. Not available on first request
@@ -94,23 +99,19 @@ class DataXceiver extends Receiver imple
   
   public static DataXceiver create(Socket s, DataNode dn,
       DataXceiverServer dataXceiverServer) throws IOException {
-    
-    SocketInputWrapper iw = NetUtils.getInputStream(s);
-    return new DataXceiver(s, iw, dn, dataXceiverServer);
+    return new DataXceiver(s, dn, dataXceiverServer);
   }
   
   private DataXceiver(Socket s, 
-      SocketInputWrapper socketInput,
       DataNode datanode, 
       DataXceiverServer dataXceiverServer) throws IOException {
-    super(new DataInputStream(new BufferedInputStream(
-        socketInput, HdfsConstants.SMALL_BUFFER_SIZE)));
 
     this.s = s;
-    this.socketInputWrapper = socketInput;
+    this.dnConf = datanode.getDnConf();
+    this.socketIn = NetUtils.getInputStream(s);
+    this.socketOut = NetUtils.getOutputStream(s, dnConf.socketWriteTimeout);
     this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
     this.datanode = datanode;
-    this.dnConf = datanode.getDnConf();
     this.dataXceiverServer = dataXceiverServer;
     remoteAddress = s.getRemoteSocketAddress().toString();
     localAddress = s.getLocalSocketAddress().toString();
@@ -141,6 +142,10 @@ class DataXceiver extends Receiver imple
 
   /** Return the datanode object. */
   DataNode getDataNode() {return datanode;}
+  
+  private OutputStream getOutputStream() throws IOException {
+    return socketOut;
+  }
 
   /**
    * Read/write data from/to the DataXceiverServer.
@@ -149,8 +154,31 @@ class DataXceiver extends Receiver imple
   public void run() {
     int opsProcessed = 0;
     Op op = null;
+    
     dataXceiverServer.childSockets.add(s);
+    
     try {
+      
+      InputStream input = socketIn;
+      if (dnConf.encryptDataTransfer) {
+        IOStreamPair encryptedStreams = null;
+        try {
+          encryptedStreams = DataTransferEncryptor.getEncryptedStreams(socketOut,
+              socketIn, datanode.blockPoolTokenSecretManager,
+              dnConf.encryptionAlgorithm);
+        } catch (InvalidMagicNumberException imne) {
+          LOG.info("Failed to read expected encryption handshake from client " +
+              "at " + s.getInetAddress() + ". Perhaps the client is running an " +
+              "older version of Hadoop which does not support encryption.");
+          return;
+        }
+        input = encryptedStreams.in;
+        socketOut = encryptedStreams.out;
+      }
+      input = new BufferedInputStream(input, HdfsConstants.SMALL_BUFFER_SIZE);
+      
+      super.initialize(new DataInputStream(input));
+      
       // We process requests in a loop, and stay around for a short timeout.
       // This optimistic behaviour allows the other end to reuse connections.
       // Setting keepalive timeout to 0 disable this behavior.
@@ -160,9 +188,9 @@ class DataXceiver extends Receiver imple
         try {
           if (opsProcessed != 0) {
             assert dnConf.socketKeepaliveTimeout > 0;
-            socketInputWrapper.setTimeout(dnConf.socketKeepaliveTimeout);
+            socketIn.setTimeout(dnConf.socketKeepaliveTimeout);
           } else {
-            socketInputWrapper.setTimeout(dnConf.socketTimeout);
+            socketIn.setTimeout(dnConf.socketTimeout);
           }
           op = readOp();
         } catch (InterruptedIOException ignored) {
@@ -215,8 +243,7 @@ class DataXceiver extends Receiver imple
       final long length) throws IOException {
     previousOpClientName = clientName;
 
-    OutputStream baseStream = NetUtils.getOutputStream(s, 
-        dnConf.socketWriteTimeout);
+    OutputStream baseStream = getOutputStream();
     DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
         baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
     checkAccess(out, true, block, blockToken,
@@ -242,13 +269,12 @@ class DataXceiver extends Receiver imple
       } catch(IOException e) {
         String msg = "opReadBlock " + block + " received exception " + e; 
         LOG.info(msg);
-        sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout);
+        sendResponse(ERROR, msg);
         throw e;
       }
       
       // send op status
-      writeSuccessWithChecksumInfo(blockSender,
-          getStreamWithTimeout(s, dnConf.socketWriteTimeout));
+      writeSuccessWithChecksumInfo(blockSender, new DataOutputStream(getOutputStream()));
 
       long read = blockSender.sendBlock(out, baseStream, null); // send data
 
@@ -347,7 +373,7 @@ class DataXceiver extends Receiver imple
     // reply to upstream datanode or client 
     final DataOutputStream replyOut = new DataOutputStream(
         new BufferedOutputStream(
-            NetUtils.getOutputStream(s, dnConf.socketWriteTimeout),
+            getOutputStream(),
             HdfsConstants.SMALL_BUFFER_SIZE));
     checkAccess(replyOut, isClient, block, blockToken,
         Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE);
@@ -389,11 +415,23 @@ class DataXceiver extends Receiver imple
           NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
           mirrorSock.setSoTimeout(timeoutValue);
           mirrorSock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
-          mirrorOut = new DataOutputStream(
-             new BufferedOutputStream(
-                         NetUtils.getOutputStream(mirrorSock, writeTimeout),
-                         HdfsConstants.SMALL_BUFFER_SIZE));
-          mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
+          
+          OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
+              writeTimeout);
+          InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
+          if (dnConf.encryptDataTransfer) {
+            IOStreamPair encryptedStreams =
+                DataTransferEncryptor.getEncryptedStreams(
+                    unbufMirrorOut, unbufMirrorIn,
+                    datanode.blockPoolTokenSecretManager
+                        .generateDataEncryptionKey(block.getBlockPoolId()));
+            
+            unbufMirrorOut = encryptedStreams.out;
+            unbufMirrorIn = encryptedStreams.in;
+          }
+          mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
+              HdfsConstants.SMALL_BUFFER_SIZE));
+          mirrorIn = new DataInputStream(unbufMirrorIn);
 
           new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
               clientname, targets, srcDataNode, stage, pipelineSize,
@@ -520,7 +558,7 @@ class DataXceiver extends Receiver imple
     updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
 
     final DataOutputStream out = new DataOutputStream(
-        NetUtils.getOutputStream(s, dnConf.socketWriteTimeout));
+        getOutputStream());
     try {
       datanode.transferReplicaForPipelineRecovery(blk, targets, clientName);
       writeResponse(Status.SUCCESS, null, out);
@@ -533,7 +571,7 @@ class DataXceiver extends Receiver imple
   public void blockChecksum(final ExtendedBlock block,
       final Token<BlockTokenIdentifier> blockToken) throws IOException {
     final DataOutputStream out = new DataOutputStream(
-        NetUtils.getOutputStream(s, dnConf.socketWriteTimeout));
+        getOutputStream());
     checkAccess(out, true, block, blockToken,
         Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ);
     updateCurrentThreadName("Reading metadata for block " + block);
@@ -593,7 +631,7 @@ class DataXceiver extends Receiver imple
         LOG.warn("Invalid access token in request from " + remoteAddress
             + " for OP_COPY_BLOCK for block " + block + " : "
             + e.getLocalizedMessage());
-        sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token", dnConf.socketWriteTimeout);
+        sendResponse(ERROR_ACCESS_TOKEN, "Invalid access token");
         return;
       }
 
@@ -603,7 +641,7 @@ class DataXceiver extends Receiver imple
       String msg = "Not able to copy block " + block.getBlockId() + " to " 
       + s.getRemoteSocketAddress() + " because threads quota is exceeded."; 
       LOG.info(msg);
-      sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout);
+      sendResponse(ERROR, msg);
       return;
     }
 
@@ -617,8 +655,7 @@ class DataXceiver extends Receiver imple
           null);
 
       // set up response stream
-      OutputStream baseStream = NetUtils.getOutputStream(
-          s, dnConf.socketWriteTimeout);
+      OutputStream baseStream = getOutputStream();
       reply = new DataOutputStream(new BufferedOutputStream(
           baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
 
@@ -670,8 +707,7 @@ class DataXceiver extends Receiver imple
         LOG.warn("Invalid access token in request from " + remoteAddress
             + " for OP_REPLACE_BLOCK for block " + block + " : "
             + e.getLocalizedMessage());
-        sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token",
-            dnConf.socketWriteTimeout);
+        sendResponse(ERROR_ACCESS_TOKEN, "Invalid access token");
         return;
       }
     }
@@ -680,7 +716,7 @@ class DataXceiver extends Receiver imple
       String msg = "Not able to receive block " + block.getBlockId() + " from " 
           + s.getRemoteSocketAddress() + " because threads quota is exceeded."; 
       LOG.warn(msg);
-      sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout);
+      sendResponse(ERROR, msg);
       return;
     }
 
@@ -699,17 +735,29 @@ class DataXceiver extends Receiver imple
       NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout);
       proxySock.setSoTimeout(dnConf.socketTimeout);
 
-      OutputStream baseStream = NetUtils.getOutputStream(proxySock, 
+      OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock,
           dnConf.socketWriteTimeout);
-      proxyOut = new DataOutputStream(new BufferedOutputStream(baseStream,
+      InputStream unbufProxyIn = NetUtils.getInputStream(proxySock);
+      if (dnConf.encryptDataTransfer) {
+        IOStreamPair encryptedStreams =
+            DataTransferEncryptor.getEncryptedStreams(
+                unbufProxyOut, unbufProxyIn,
+                datanode.blockPoolTokenSecretManager
+                    .generateDataEncryptionKey(block.getBlockPoolId()));
+        unbufProxyOut = encryptedStreams.out;
+        unbufProxyIn = encryptedStreams.in;
+      }
+      
+      proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut, 
           HdfsConstants.SMALL_BUFFER_SIZE));
+      proxyReply = new DataInputStream(new BufferedInputStream(unbufProxyIn,
+          HdfsConstants.IO_FILE_BUFFER_SIZE));
 
       /* send request to the proxy */
       new Sender(proxyOut).copyBlock(block, blockToken);
 
       // receive the response from the proxy
-      proxyReply = new DataInputStream(new BufferedInputStream(
-          NetUtils.getInputStream(proxySock), HdfsConstants.IO_FILE_BUFFER_SIZE));
+      
       BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom(
           HdfsProtoUtil.vintPrefixed(proxyReply));
 
@@ -762,7 +810,7 @@ class DataXceiver extends Receiver imple
       
       // send response back
       try {
-        sendResponse(s, opStatus, errMsg, dnConf.socketWriteTimeout);
+        sendResponse(opStatus, errMsg);
       } catch (IOException ioe) {
         LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
       }
@@ -781,20 +829,13 @@ class DataXceiver extends Receiver imple
 
   /**
    * Utility function for sending a response.
-   * @param s socket to write to
+   * 
    * @param opStatus status message to write
-   * @param timeout send timeout
-   **/
-  private static void sendResponse(Socket s, Status status, String message,
-      long timeout) throws IOException {
-    DataOutputStream reply = getStreamWithTimeout(s, timeout);
-    
-    writeResponse(status, message, reply);
-  }
-  
-  private static DataOutputStream getStreamWithTimeout(Socket s, long timeout)
-      throws IOException {
-    return new DataOutputStream(NetUtils.getOutputStream(s, timeout));
+   * @param message message to send to the client or other DN
+   */
+  private void sendResponse(Status status,
+      String message) throws IOException {
+    writeResponse(status, message, getOutputStream());
   }
 
   private static void writeResponse(Status status, String message, OutputStream out)

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java Thu Aug  9 22:29:36 2012
@@ -606,7 +606,7 @@ public class DatanodeJspHelper {
     try {
       JspHelper.streamBlockInAscii(new InetSocketAddress(req.getServerName(),
           datanodePort), bpid, blockId, blockToken, genStamp, blockSize,
-          startOffset, chunkSizeToView, out, conf);
+          startOffset, chunkSizeToView, out, conf, dfs.getDataEncryptionKey());
     } catch (Exception e) {
       out.print(e);
     }
@@ -699,7 +699,7 @@ public class DatanodeJspHelper {
 
     out.print("<textarea cols=\"100\" rows=\"25\" wrap=\"virtual\" style=\"width:100%\" READONLY>");
     JspHelper.streamBlockInAscii(addr, poolId, blockId, accessToken, genStamp,
-        blockSize, startOffset, chunkSizeToView, out, conf);
+        blockSize, startOffset, chunkSizeToView, out, conf, dfs.getDataEncryptionKey());
     out.print("</textarea>");
     dfs.close();
   }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java Thu Aug  9 22:29:36 2012
@@ -22,6 +22,7 @@ import java.util.Collection;
 
 import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 
 /**
  * A JournalManager implementation that uses RPCs to log transactions
@@ -39,6 +40,20 @@ class BackupJournalManager implements Jo
   }
 
   @Override
+  public void format(NamespaceInfo nsInfo) {
+    // format() should only get called at startup, before any BNs
+    // can register with the NN.
+    throw new UnsupportedOperationException(
+        "BackupNode journal should never get formatted");
+  }
+  
+  @Override
+  public boolean hasSomeData() {
+    throw new UnsupportedOperationException();
+  }
+
+  
+  @Override
   public EditLogOutputStream startLogSegment(long txId) throws IOException {
     EditLogBackupOutputStream stm = new EditLogBackupOutputStream(bnReg,
         journalInfo);

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Thu Aug  9 22:29:36 2012
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.security.t
 import static org.apache.hadoop.util.ExitUtil.terminate;
 
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
@@ -333,6 +334,39 @@ public class FSEditLog  {
     state = State.CLOSED;
   }
 
+
+  /**
+   * Format all configured journals which are not file-based.
+   * 
+   * File-based journals are skipped, since they are formatted by the
+   * Storage format code.
+   */
+  void formatNonFileJournals(NamespaceInfo nsInfo) throws IOException {
+    Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS,
+        "Bad state: %s", state);
+    
+    for (JournalManager jm : journalSet.getJournalManagers()) {
+      if (!(jm instanceof FileJournalManager)) {
+        jm.format(nsInfo);
+      }
+    }
+  }
+  
+  List<FormatConfirmable> getFormatConfirmables() {
+    Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS,
+        "Bad state: %s", state);
+
+    List<FormatConfirmable> ret = Lists.newArrayList();
+    for (final JournalManager jm : journalSet.getJournalManagers()) {
+      // The FJMs are confirmed separately since they are also
+      // StorageDirectories
+      if (!(jm instanceof FileJournalManager)) {
+        ret.add(jm);
+      }
+    }
+    return ret;
+  }
+
   /**
    * Write an operation to the edit log. Do not sync to persistent
    * store yet.

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Thu Aug  9 22:29:36 2012
@@ -40,10 +40,12 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
 import org.apache.hadoop.hdfs.server.common.Util;
 import static org.apache.hadoop.util.Time.now;
+
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 
@@ -139,11 +141,34 @@ public class FSImage implements Closeabl
         fileCount + " files");
     NamespaceInfo ns = NNStorage.newNamespaceInfo();
     ns.clusterID = clusterId;
+    
     storage.format(ns);
+    editLog.formatNonFileJournals(ns);
     saveFSImageInAllDirs(fsn, 0);
   }
   
   /**
+   * Check whether the storage directories and non-file journals exist.
+   * If running in interactive mode, will prompt the user for each
+   * directory to allow them to format anyway. Otherwise, returns
+   * false, unless 'force' is specified.
+   * 
+   * @param force format regardless of whether dirs exist
+   * @param interactive prompt the user when a dir exists
+   * @return true if formatting should proceed
+   * @throws IOException if some storage cannot be accessed
+   */
+  boolean confirmFormat(boolean force, boolean interactive) throws IOException {
+    List<FormatConfirmable> confirms = Lists.newArrayList();
+    for (StorageDirectory sd : storage.dirIterable(null)) {
+      confirms.add(sd);
+    }
+    
+    confirms.addAll(editLog.getFormatConfirmables());
+    return Storage.confirmFormat(confirms, force, interactive);
+  }
+  
+  /**
    * Analyze storage directories.
    * Recover from previous transitions if required. 
    * Perform fs state transition if necessary depending on the namespace info.

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Aug  9 22:29:36 2012
@@ -25,6 +25,8 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
@@ -476,7 +478,8 @@ public class FSNamesystem implements Nam
           conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT),
           conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT),
           (short) conf.getInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT),
-          conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT));
+          conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
+          conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, DFS_ENCRYPT_DATA_TRANSFER_DEFAULT));
       
       this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY, 
                                        DFS_NAMENODE_MAX_OBJECTS_DEFAULT);
@@ -1752,8 +1755,6 @@ public class FSNamesystem implements Nam
 
     try {
       INodeFile myFile = dir.getFileINode(src);
-      recoverLeaseInternal(myFile, src, holder, clientMachine, false);
-
       try {
         blockManager.verifyReplication(src, replication, clientMachine);
       } catch(IOException e) {
@@ -1769,10 +1770,15 @@ public class FSNamesystem implements Nam
         // File exists - must be one of append or overwrite
         if (overwrite) {
           delete(src, true);
-        } else if (!append) {
-          throw new FileAlreadyExistsException("failed to create file " + src
-              + " on client " + clientMachine
-              + " because the file exists");
+        } else {
+          // Opening an existing file for write - may need to recover lease.
+          recoverLeaseInternal(myFile, src, holder, clientMachine, false);
+
+          if (!append) {
+            throw new FileAlreadyExistsException("failed to create file " + src
+                + " on client " + clientMachine
+                + " because the file exists");
+          }
         }
       }
 
@@ -2031,6 +2037,7 @@ public class FSNamesystem implements Nam
   
   void setBlockPoolId(String bpid) {
     blockPoolId = bpid;
+    blockManager.setBlockPoolId(blockPoolId);
   }
 
   /**

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java Thu Aug  9 22:29:36 2012
@@ -127,7 +127,7 @@ public class FileChecksumServlets {
             datanode, conf, getUGI(request, conf));
         final ClientProtocol nnproxy = dfs.getNamenode();
         final MD5MD5CRC32FileChecksum checksum = DFSClient.getFileChecksum(
-            path, nnproxy, socketFactory, socketTimeout);
+            path, nnproxy, socketFactory, socketTimeout, dfs.getDataEncryptionKey());
         MD5MD5CRC32FileChecksum.write(xml, checksum);
       } catch(IOException ioe) {
         writeXml(ioe, path, xml);

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Thu Aug  9 22:29:36 2012
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -81,6 +82,22 @@ public class FileJournalManager implemen
   public void close() throws IOException {}
   
   @Override
+  public void format(NamespaceInfo ns) {
+    // Formatting file journals is done by the StorageDirectory
+    // format code, since they may share their directory with
+    // checkpoints, etc.
+    throw new UnsupportedOperationException();
+  }
+  
+  @Override
+  public boolean hasSomeData() {
+    // Formatting file journals is done by the StorageDirectory
+    // format code, since they may share their directory with
+    // checkpoints, etc.
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   synchronized public EditLogOutputStream startLogSegment(long txid) 
       throws IOException {
     try {

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java Thu Aug  9 22:29:36 2012
@@ -23,6 +23,8 @@ import java.util.Collection;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 
 /**
  * A JournalManager is responsible for managing a single place of storing
@@ -33,7 +35,14 @@ import org.apache.hadoop.classification.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public interface JournalManager extends Closeable {
+public interface JournalManager extends Closeable, FormatConfirmable {
+
+  /**
+   * Format the underlying storage, removing any previously
+   * stored data.
+   */
+  void format(NamespaceInfo ns);
+
   /**
    * Begin writing to a new segment of the log stream, which starts at
    * the given transaction ID.

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java Thu Aug  9 22:29:36 2012
@@ -33,6 +33,7 @@ import org.apache.hadoop.classification.
 
 import static org.apache.hadoop.util.ExitUtil.terminate;
 
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 
@@ -173,6 +174,20 @@ public class JournalSet implements Journ
   }
   
   @Override
+  public void format(NamespaceInfo nsInfo) {
+    // The iteration is done by FSEditLog itself
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean hasSomeData() throws IOException {
+    // This is called individually on the underlying journals,
+    // not on the JournalSet.
+    throw new UnsupportedOperationException();
+  }
+
+  
+  @Override
   public EditLogOutputStream startLogSegment(final long txId) throws IOException {
     mapJournalsAndReportErrors(new JournalClosure() {
       @Override

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java Thu Aug  9 22:29:36 2012
@@ -28,7 +28,6 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -58,7 +57,6 @@ import org.apache.hadoop.util.Time;
 import com.google.common.base.Preconditions;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 
 /**
  * NNStorage is responsible for management of the StorageDirectories used by

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Thu Aug  9 22:29:36 2012
@@ -67,6 +67,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.Server;
@@ -709,9 +710,6 @@ public class NameNode {
     dirsToPrompt.addAll(sharedDirs);
     List<URI> editDirsToFormat = 
                  FSNamesystem.getNamespaceEditsDirs(conf);
-    if (!confirmFormat(dirsToPrompt, force, isInteractive)) {
-      return true; // aborted
-    }
 
     // if clusterID is not provided - see if you can find the current one
     String clusterId = StartupOption.FORMAT.getClusterId();
@@ -723,62 +721,16 @@ public class NameNode {
     
     FSImage fsImage = new FSImage(conf, nameDirsToFormat, editDirsToFormat);
     FSNamesystem fsn = new FSNamesystem(conf, fsImage);
+    fsImage.getEditLog().initJournalsForWrite();
+    
+    if (!fsImage.confirmFormat(force, isInteractive)) {
+      return true; // aborted
+    }
+    
     fsImage.format(fsn, clusterId);
     return false;
   }
 
-  /**
-   * Check whether the given storage directories already exist.
-   * If running in interactive mode, will prompt the user for each
-   * directory to allow them to format anyway. Otherwise, returns
-   * false, unless 'force' is specified.
-   * 
-   * @param dirsToFormat the dirs to check
-   * @param force format regardless of whether dirs exist
-   * @param interactive prompt the user when a dir exists
-   * @return true if formatting should proceed
-   * @throws IOException
-   */
-  public static boolean confirmFormat(Collection<URI> dirsToFormat,
-      boolean force, boolean interactive)
-      throws IOException {
-    for(Iterator<URI> it = dirsToFormat.iterator(); it.hasNext();) {
-      URI dirUri = it.next();
-      if (!dirUri.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
-        System.err.println("Skipping format for directory \"" + dirUri
-            + "\". Can only format local directories with scheme \""
-            + NNStorage.LOCAL_URI_SCHEME + "\".");
-        continue;
-      }
-      // To validate only file based schemes are formatted
-      assert dirUri.getScheme().equals(NNStorage.LOCAL_URI_SCHEME) :
-        "formatting is not supported for " + dirUri;
-
-      File curDir = new File(dirUri.getPath());
-      // Its alright for a dir not to exist, or to exist (properly accessible)
-      // and be completely empty.
-      if (!curDir.exists() ||
-          (curDir.isDirectory() && FileUtil.listFiles(curDir).length == 0))
-        continue;
-      if (force) { // Don't confirm, always format.
-        System.err.println(
-            "Storage directory exists in " + curDir + ". Formatting anyway.");
-        continue;
-      }
-      if (!interactive) { // Don't ask - always don't format
-        System.err.println(
-            "Running in non-interactive mode, and image appears to exist in " +
-            curDir + ". Not formatting.");
-        return false;
-      }
-      if (!confirmPrompt("Re-format filesystem in " + curDir + " ?")) {
-        System.err.println("Format aborted in " + curDir);
-        return false;
-      }
-    }
-    return true;
-  }
-
   public static void checkAllowFormat(Configuration conf) throws IOException {
     if (!conf.getBoolean(DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY, 
         DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_DEFAULT)) {
@@ -822,17 +774,26 @@ public class NameNode {
           FSNamesystem.getNamespaceEditsDirs(conf, false));
       
       existingStorage = fsns.getFSImage().getStorage();
+      NamespaceInfo nsInfo = existingStorage.getNamespaceInfo();
       
-      Collection<URI> sharedEditsDirs = FSNamesystem.getSharedEditsDirs(conf);
-      if (!confirmFormat(sharedEditsDirs, force, interactive)) {
-        return true; // aborted
-      }
-      NNStorage newSharedStorage = new NNStorage(conf,
+      List<URI> sharedEditsDirs = FSNamesystem.getSharedEditsDirs(conf);
+      
+      FSImage sharedEditsImage = new FSImage(conf,
           Lists.<URI>newArrayList(),
           sharedEditsDirs);
+      sharedEditsImage.getEditLog().initJournalsForWrite();
       
-      newSharedStorage.format(existingStorage.getNamespaceInfo());
+      if (!sharedEditsImage.confirmFormat(force, interactive)) {
+        return true; // abort
+      }
       
+      NNStorage newSharedStorage = sharedEditsImage.getStorage();
+      // Call Storage.format instead of FSImage.format here, since we don't
+      // actually want to save a checkpoint - just prime the dirs with
+      // the existing namespace info
+      newSharedStorage.format(nsInfo);
+      sharedEditsImage.getEditLog().formatNonFileJournals(nsInfo);
+
       // Need to make sure the edit log segments are in good shape to initialize
       // the shared edits dir.
       fsns.getFSImage().getEditLog().close();

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Thu Aug  9 22:29:36 2012
@@ -83,6 +83,7 @@ import org.apache.hadoop.hdfs.protocolPB
 import org.apache.hadoop.hdfs.protocolPB.RefreshAuthorizationPolicyProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.RefreshUserMappingsProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.RefreshUserMappingsProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
@@ -1046,4 +1047,9 @@ class NameNodeRpcServer implements Namen
     }
     return clientMachine;
   }
+
+  @Override
+  public DataEncryptionKey getDataEncryptionKey() throws IOException {
+    return namesystem.getBlockManager().generateDataEncryptionKey();
+  }
 }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Thu Aug  9 22:29:36 2012
@@ -560,7 +560,8 @@ public class NamenodeFsck {
             block.getBlockId());
         blockReader = BlockReaderFactory.newBlockReader(
             conf, s, file, block, lblock
-            .getBlockToken(), 0, -1);
+            .getBlockToken(), 0, -1,
+            namenode.getRpcServer().getDataEncryptionKey());
         
       }  catch (IOException ex) {
         // Put chosen node into dead list, continue

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java Thu Aug  9 22:29:36 2012
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
 import org.apache.hadoop.hdfs.server.namenode.FSImage;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -55,7 +56,6 @@ import org.apache.hadoop.util.ToolRunner
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
 
 /**
  * Tool which allows the standby node's storage directories to be bootstrapped
@@ -171,19 +171,18 @@ public class BootstrapStandby implements
         "           Layout version: " + nsInfo.getLayoutVersion() + "\n" +
         "=====================================================");
 
+    long imageTxId = proxy.getMostRecentCheckpointTxId();
+    long curTxId = proxy.getTransactionID();
+    
+    NNStorage storage = new NNStorage(conf, dirsToFormat, editUrisToFormat);
+    
     // Check with the user before blowing away data.
-    if (!NameNode.confirmFormat(
-            Sets.union(Sets.newHashSet(dirsToFormat),
-                Sets.newHashSet(editUrisToFormat)),
+    if (!Storage.confirmFormat(storage.dirIterable(null),
             force, interactive)) {
       return ERR_CODE_ALREADY_FORMATTED;
     }
     
-    long imageTxId = proxy.getMostRecentCheckpointTxId();
-    long curTxId = proxy.getTransactionID();
-    
     // Format the storage (writes VERSION file)
-    NNStorage storage = new NNStorage(conf, dirsToFormat, editUrisToFormat);
     storage.format(nsInfo);
 
     // Load the newly formatted image, using all of the directories (including shared

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java Thu Aug  9 22:29:36 2012
@@ -259,7 +259,6 @@ public class DelegationTokenFetcher {
     try {
       URL url = new URL(buf.toString());
       connection = (HttpURLConnection) SecurityUtil.openSecureHttpConnection(url);
-      connection = (HttpURLConnection)URLUtils.openConnection(url);
       if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
         throw new IOException("Error renewing token: " + 
             connection.getResponseMessage());

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java Thu Aug  9 22:29:36 2012
@@ -292,7 +292,11 @@ public class WebHdfsFileSystem extends F
             + ", message=" + conn.getResponseMessage(), e);
       }
 
-      if (m.get(RemoteException.class.getSimpleName()) == null) {
+      if (m == null) {
+        throw new IOException("Unexpected HTTP response: code=" + code + " != "
+            + op.getExpectedHttpResponseCode() + ", " + op.toQueryString()
+            + ", message=" + conn.getResponseMessage());
+      } else if (m.get(RemoteException.class.getSimpleName()) == null) {
         return m;
       }
 
@@ -529,6 +533,7 @@ public class WebHdfsFileSystem extends F
       
       //Step 2) Submit another Http request with the URL from the Location header with data.
       conn = (HttpURLConnection)new URL(redirect).openConnection();
+      conn.setRequestProperty("Content-Type", MediaType.APPLICATION_OCTET_STREAM);
       conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
       connect();
       return conn;
@@ -541,7 +546,9 @@ public class WebHdfsFileSystem extends F
     void getResponse(boolean getJsonAndDisconnect) throws IOException {
       try {
         connect();
-        if (!redirected && op.getRedirect()) {
+        final int code = conn.getResponseCode();
+        if (!redirected && op.getRedirect()
+            && code != op.getExpectedHttpResponseCode()) {
           final String redirect = conn.getHeaderField("Location");
           json = validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op),
               conn, false);

Propchange: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1367365-1371513

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt Thu Aug  9 22:29:36 2012
@@ -71,4 +71,18 @@ IF(FUSE_FOUND)
         m
         pthread
     )
+    add_executable(test_fuse_dfs
+        test/test_fuse_dfs.c
+        test/fuse_workload.c
+    )
+    target_link_libraries(test_fuse_dfs
+        ${FUSE_LIBRARIES}
+        native_mini_dfs
+        posix_util
+        pthread
+    )
+ELSE(FUSE_FOUND)
+    IF(REQUIRE_FUSE)
+        MESSAGE(FATAL_ERROR "Required component fuse_dfs could not be built.")
+    ENDIF(REQUIRE_FUSE)
 ENDIF(FUSE_FOUND)

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h Thu Aug  9 22:29:36 2012
@@ -78,7 +78,7 @@
     do { \
         int __my_ret__ = x; \
         int __my_errno__ = errno; \
-        if (__my_ret__) { \
+        if (!__my_ret__) { \
             fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
 		    "code %d (errno: %d): got zero from %s\n", __LINE__, \
                 __my_ret__, __my_errno__, #x); \
@@ -98,4 +98,23 @@
         } \
     } while (0);
 
+#define EXPECT_INT_EQ(x, y) \
+    do { \
+        int __my_ret__ = y; \
+        int __my_errno__ = errno; \
+        if (__my_ret__ != (x)) { \
+            fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
+              "code %d (errno: %d): expected %d\n", \
+               __LINE__, __my_ret__, __my_errno__, (x)); \
+            return -1; \
+        } \
+    } while (0);
+
+#define RETRY_ON_EINTR_GET_ERRNO(ret, expr) do { \
+    ret = expr; \
+    if (!ret) \
+        break; \
+    ret = -errno; \
+    } while (ret == -EINTR);
+
 #endif



Mime
View raw message