Return-Path: Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: (qmail 66954 invoked from network); 9 Dec 2009 07:29:56 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 9 Dec 2009 07:29:56 -0000 Received: (qmail 31572 invoked by uid 500); 9 Dec 2009 07:29:55 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 31529 invoked by uid 500); 9 Dec 2009 07:29:54 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 31520 invoked by uid 99); 9 Dec 2009 07:29:54 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Dec 2009 07:29:54 +0000 X-ASF-Spam-Status: No, hits=-2.6 required=5.0 tests=AWL,BAYES_00 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Dec 2009 07:29:51 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 5818C238889D; Wed, 9 Dec 2009 07:29:29 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r888718 - in /hadoop/common/branches/branch-0.20: ./ src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/ src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/test/org/apache/hadoop/hdfs/ Date: Wed, 09 Dec 2009 07:29:29 -0000 To: common-commits@hadoop.apache.org From: hairong@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091209072929.5818C238889D@eris.apache.org> Author: hairong Date: Wed Dec 9 07:29:28 2009 New Revision: 888718 URL: http://svn.apache.org/viewvc?rev=888718&view=rev Log: Move the change of HDFS-793 from trunk to branch 0.20 Modified: hadoop/common/branches/branch-0.20/CHANGES.txt hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Modified: hadoop/common/branches/branch-0.20/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/CHANGES.txt?rev=888718&r1=888717&r2=888718&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.20/CHANGES.txt Wed Dec 9 07:29:28 2009 @@ -75,6 +75,10 @@ MAPREDUCE-1182. Fix overflow in reduce causing allocations to exceed the configured threshold. (cdouglas) + HDFS-793. Data node should recceive the whole packet ack message before + it constructs and sends its own ack message for the packet. (hairong) + + Release 0.20.1 - 2009-09-01 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=888718&r1=888717&r2=888718&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Wed Dec 9 07:29:28 2009 @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.*; import org.apache.hadoop.hdfs.DistributedFileSystem.DiskStatus; import org.apache.hadoop.hdfs.protocol.*; +import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck; import org.apache.hadoop.hdfs.server.common.HdfsConstants; import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; import org.apache.hadoop.hdfs.server.datanode.DataNode; @@ -2398,14 +2399,18 @@ public void run() { this.setName("ResponseProcessor for block " + block); + PipelineAck ack = new PipelineAck(); while (!closed && clientRunning && !lastPacketInBlock) { // process responses from datanodes. try { - // verify seqno from datanode - long seqno = blockReplyStream.readLong(); - LOG.debug("DFSClient received ack for seqno " + seqno); - if (seqno == -1) { + // read an ack from the pipeline + ack.readFields(blockReplyStream); + if (LOG.isDebugEnabled()) { + LOG.debug("DFSClient " + ack); + } + long seqno = ack.getSeqno(); + if (seqno == PipelineAck.HEART_BEAT.getSeqno()) { continue; } else if (seqno == -2) { // no nothing @@ -2423,8 +2428,8 @@ } // processes response status from all datanodes. - for (int i = 0; i < targets.length && clientRunning; i++) { - short reply = blockReplyStream.readShort(); + for (int i = ack.getNumOfReplies()-1; i >=0 && clientRunning; i--) { + short reply = ack.getReply(i); if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) { errorIndex = i; // first bad datanode throw new IOException("Bad response " + reply + Modified: hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=888718&r1=888717&r2=888718&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original) +++ hadoop/common/branches/branch-0.20/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Wed Dec 9 07:29:28 2009 @@ -17,6 +17,11 @@ */ package org.apache.hadoop.hdfs.protocol; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; /** * @@ -31,15 +36,11 @@ * when protocol changes. It is not very obvious. */ /* - * Version 14: - * OP_REPLACE_BLOCK is sent from the Balancer server to the destination, - * including the block id, source, and proxy. - * OP_COPY_BLOCK is sent from the destination to the proxy, which contains - * only the block id. - * A reply to OP_COPY_BLOCK sends the block content. - * A reply to OP_REPLACE_BLOCK includes an operation status. + * Version 18: + * Change the block packet ack protocol to include seqno, + * numberOfReplies, reply0, reply1, ... */ - public static final int DATA_TRANSFER_VERSION = 14; + public static final int DATA_TRANSFER_VERSION = 18; // Processed at datanode stream-handler public static final byte OP_WRITE_BLOCK = (byte) 80; @@ -56,6 +57,97 @@ public static final int OP_STATUS_ERROR_EXISTS = 4; public static final int OP_STATUS_CHECKSUM_OK = 5; - - + /** reply **/ + public static class PipelineAck implements Writable { + private long seqno; + private short replies[]; + final public static PipelineAck HEART_BEAT = new PipelineAck(-1, new short[0]); + + /** default constructor **/ + public PipelineAck() { + } + + /** + * Constructor + * @param seqno sequence number + * @param replies an array of replies + */ + public PipelineAck(long seqno, short[] replies) { + this.seqno = seqno; + this.replies = replies; + } + + /** + * Get the sequence number + * @return the sequence number + */ + public long getSeqno() { + return seqno; + } + + /** + * Get the number of replies + * @return the number of replies + */ + public short getNumOfReplies() { + return (short)replies.length; + } + + /** + * get the ith reply + * @return the the ith reply + */ + public short getReply(int i) { + return replies[i]; + } + + /** + * Check if this ack contains error status + * @return true if all statuses are SUCCESS + */ + public boolean isSuccess() { + for (short reply : replies) { + if (reply != OP_STATUS_SUCCESS) { + return false; + } + } + return true; + } + + /**** Writable interface ****/ + @Override // Writable + public void readFields(DataInput in) throws IOException { + seqno = in.readLong(); + short numOfReplies = in.readShort(); + replies = new short[numOfReplies]; + for (int i=0; i datanode.socketTimeout/2) { - replyOut.writeLong(-1); // send heartbeat + PipelineAck.HEART_BEAT.write(replyOut); // send heart beat replyOut.flush(); + if (LOG.isDebugEnabled()) { + LOG.debug("PacketResponder " + numTargets + + " for block " + block + + " sent a heartbeat"); + } lastHeartbeat = now; } } @@ -814,8 +820,8 @@ lastPacket = true; } - replyOut.writeLong(expected); - replyOut.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); + new PipelineAck(expected, new short[]{ + DataTransferProtocol.OP_STATUS_SUCCESS}).write(replyOut); replyOut.flush(); } catch (Exception e) { if (running) { @@ -845,23 +851,21 @@ while (running && datanode.shouldRun && !lastPacketInBlock) { try { - short op = DataTransferProtocol.OP_STATUS_SUCCESS; boolean didRead = false; long expected = -2; + PipelineAck ack = new PipelineAck(); try { - // read seqno from downstream datanode - long seqno = mirrorIn.readLong(); + // read an ack from downstream datanode + ack.readFields(mirrorIn); + if (LOG.isDebugEnabled()) { + LOG.debug("PacketResponder " + numTargets + " got " + ack); + } + long seqno = ack.getSeqno(); didRead = true; - if (seqno == -1) { - replyOut.writeLong(-1); // send keepalive + if (seqno == PipelineAck.HEART_BEAT.getSeqno()) { + ack.write(replyOut); // send keepalive replyOut.flush(); - LOG.debug("PacketResponder " + numTargets + " got -1"); - continue; - } else if (seqno == -2) { - LOG.debug("PacketResponder " + numTargets + " got -2"); - } else { - LOG.debug("PacketResponder " + numTargets + " got seqno = " + - seqno); + } else if (seqno >= 0) { Packet pkt = null; synchronized (this) { while (running && datanode.shouldRun && ackQueue.size() == 0) { @@ -876,7 +880,6 @@ pkt = ackQueue.removeFirst(); expected = pkt.seqno; notifyAll(); - LOG.debug("PacketResponder " + numTargets + " seqno = " + seqno); if (seqno != expected) { throw new IOException("PacketResponder " + numTargets + " for block " + block + @@ -909,10 +912,6 @@ continue; } - if (!didRead) { - op = DataTransferProtocol.OP_STATUS_ERROR; - } - // If this is the last packet in block, then close block // file and finalize the block before responding success if (lastPacketInBlock && !receiver.finalized) { @@ -935,43 +934,34 @@ } } - // send my status back to upstream datanode - replyOut.writeLong(expected); // send seqno upstream - replyOut.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); - - LOG.debug("PacketResponder " + numTargets + - " for block " + block + - " responded my status " + - " for seqno " + expected); - - // forward responses from downstream datanodes. - for (int i = 0; i < numTargets && datanode.shouldRun; i++) { - try { - if (op == DataTransferProtocol.OP_STATUS_SUCCESS) { - op = mirrorIn.readShort(); - if (op != DataTransferProtocol.OP_STATUS_SUCCESS) { - LOG.debug("PacketResponder for block " + block + - ": error code received from downstream " + - " datanode[" + i + "] " + op); - } - } - } catch (Throwable e) { - op = DataTransferProtocol.OP_STATUS_ERROR; + // construct my ack message + short[] replies = null; + if (!didRead) { // no ack is read + replies = new short[2]; + replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS; + replies[1] = DataTransferProtocol.OP_STATUS_ERROR; + } else { + replies = new short[1+ack.getNumOfReplies()]; + replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS; + for (int i=0; i 0) { + if (!replyAck.isSuccess() && receiver.clientName.length() > 0) { running = false; } } catch (IOException e) { Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=888718&r1=888717&r2=888718&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original) +++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Wed Dec 9 07:29:28 2009 @@ -250,9 +250,9 @@ sendOut.writeInt(0); // chunk length sendOut.writeInt(0); // zero checksum //ok finally write a block with 0 len - Text.writeString(recvOut, ""); // first bad node - recvOut.writeLong(100); // sequencenumber - recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS); + Text.writeString(recvOut, ""); + new DataTransferProtocol.PipelineAck(100, + new short[]{DataTransferProtocol.OP_STATUS_SUCCESS}).write(recvOut); sendRecvData("Writing a zero len block blockid " + newBlockId, false); /* Test OP_READ_BLOCK */