hadoop-hdfs-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Aaron T. Myers (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (HDFS-3721) hsync support broke wire compatibility
Date Sat, 04 Aug 2012 20:17:02 GMT

     [ https://issues.apache.org/jira/browse/HDFS-3721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Aaron T. Myers updated HDFS-3721:
---------------------------------

    Attachment: hdfs-3721.txt

Todd's been busy with some other stuff so I'm going to take over working on this issue.

Attached is an updated patch which addresses Suresh's and my feedback.

During testing, I also discovered that Todd's original patch introduced some unfortunate interaction
with Nagle's algorithm in TCP for small packets during writing. This was the cause of the
TestFileConcurrentReader failure. So, in addition to addressing the feedback, this patch also
changes DFSOutputStream.Packet#writeTo back to only ever call OutputStream#write once, by
ensuring that the packet header, checksum data, and actual data are all in a single contiguous
buffer, similarly to how it was done before this patch.

The functional difference between this patch and the last is the following, to make review
easier:

{code}
commit 61fe911e60fe4db710813fc97a209456722ef1f7
Author: Aaron T. Myers <atm@cloudera.com>
Date:   Sat Aug 4 12:32:11 2012 -0700

    Send one buffer when writing to avoid nagling.

diff --git hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index e460a3c..4582712 100644
--- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -147,10 +147,17 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable
{
      * buf is pointed into like follows:
      *  (C is checksum data, D is payload data)
      *
-     * [CCCCCCCCC________________DDDDDDDDDDDDDDDD___]
-     *           ^               ^               ^
-     *           checksumPos     dataStart       dataPos
+     * [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___]
+     *           ^        ^               ^               ^
+     *           |        checksumPos     dataStart       dataPos
+     *           checksumStart
+     * 
+     * Right before sending, we move the checksum data to immediately precede
+     * the actual data, and then insert the header into the buffer immediately
+     * preceding the checksum data, so we make sure to keep enough space in
+     * front of the checksum data to support the largest conceivable header. 
      */
+    int checksumStart;
     int checksumPos;
     int dataStart;
     int dataPos;
@@ -166,9 +173,9 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable
{
       this.offsetInBlock = 0;
       this.seqno = HEART_BEAT_SEQNO;
       
-      buf = new byte[0];
+      buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
       
-      checksumPos = dataPos = dataStart = 0;
+      checksumStart = checksumPos = dataPos = dataStart = PacketHeader.PKT_MAX_HEADER_LEN;
       maxChunks = 0;
     }
     
@@ -180,10 +187,11 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable
{
       this.seqno = currentSeqno;
       currentSeqno++;
       
-      buf = new byte[pktSize];
+      buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN + pktSize];
       
-      checksumPos = 0;
-      dataStart = chunksPerPkt * checksum.getChecksumSize();
+      checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
+      checksumPos = checksumStart;
+      dataStart = checksumStart + (chunksPerPkt * checksum.getChecksumSize());
       dataPos = dataStart;
       maxChunks = chunksPerPkt;
     }
@@ -205,19 +213,38 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable
{
     }
     
     /**
-     * Returns ByteBuffer that contains one full packet, including header.
+     * Write the full packet, including the header, to the given output stream.
      */
     void writeTo(DataOutputStream stm) throws IOException {
-      int dataLen = dataPos - dataStart;
-      int checksumLen = checksumPos;
-      int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
+      final int dataLen = dataPos - dataStart;
+      final int checksumLen = checksumPos - checksumStart;
+      final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
 
       PacketHeader header = new PacketHeader(
         pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
       
-      header.write(stm);
-      stm.write(buf, 0, checksumLen);
-      stm.write(buf, dataStart, dataLen);
+      if (checksumPos != dataStart) {
+        // Move the checksum to cover the gap. This can happen for the last
+        // packet or during an hflush/hsync call.
+        System.arraycopy(buf, checksumStart, buf, 
+                         dataStart - checksumLen , checksumLen); 
+        checksumPos = dataStart;
+        checksumStart = checksumPos - checksumLen;
+      }
+      
+      final int headerStart = checksumStart - header.getSerializedSize();
+      assert checksumStart + 1 >= header.getSerializedSize();
+      assert checksumPos == dataStart;
+      assert headerStart >= 0;
+      assert headerStart + header.getSerializedSize() == checksumStart;
+      
+      // Copy the header data into the buffer immediately preceding the checksum
+      // data.
+      System.arraycopy(header.getBytes(), 0, buf, headerStart,
+          header.getSerializedSize());
+      
+      // Write the now contiguous full packet to the output stream.
+      stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen);
     }
     
     // get the packet's last byte's offset in the block
diff --git hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
index 1709101..dd56962 100644
--- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
+++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
@@ -166,6 +166,12 @@ public class PacketHeader {
     out.writeShort(proto.getSerializedSize());
     proto.writeTo(out);
   }
+  
+  public byte[] getBytes() {
+    ByteBuffer buf = ByteBuffer.allocate(getSerializedSize());
+    putInBuffer(buf);
+    return buf.array();
+  }
 
   /**
    * Perform a sanity check on the packet, returning true if it is sane.
{code}
                
> hsync support broke wire compatibility
> --------------------------------------
>
>                 Key: HDFS-3721
>                 URL: https://issues.apache.org/jira/browse/HDFS-3721
>             Project: Hadoop HDFS
>          Issue Type: Bug
>          Components: data-node, hdfs client
>    Affects Versions: 2.1.0-alpha
>            Reporter: Todd Lipcon
>            Assignee: Todd Lipcon
>            Priority: Critical
>         Attachments: hdfs-3721.txt, hdfs-3721.txt
>
>
> HDFS-744 added support for hsync to the data transfer wire protocol. However, it actually
broke wire compatibility: if the client has hsync support but the server does not, the client
cannot read or write data on the old cluster.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Mime
View raw message