Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 966227B99 for ; Sat, 3 Sep 2011 00:21:09 +0000 (UTC) Received: (qmail 70107 invoked by uid 500); 3 Sep 2011 00:21:09 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 70079 invoked by uid 500); 3 Sep 2011 00:21:08 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 70072 invoked by uid 99); 3 Sep 2011 00:21:08 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 03 Sep 2011 00:21:08 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 03 Sep 2011 00:21:02 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id BA6B62388900 for ; Sat, 3 Sep 2011 00:20:40 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1164767 - in /hadoop/common/branches/branch-0.20-security: ./ src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/ src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/test/org/apache/hadoop/hdfs/ src/test/org/apache/... Date: Sat, 03 Sep 2011 00:20:40 -0000 To: common-commits@hadoop.apache.org From: suresh@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110903002040.BA6B62388900@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: suresh Date: Sat Sep 3 00:20:39 2011 New Revision: 1164767 URL: http://svn.apache.org/viewvc?rev=1164767&view=rev Log: Port from 0.20-append - HDFS-724. Use a bidirectional heartbeat to detect stuck pipeline. Contributed by Hairong Kuang. Added: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/TestStuckDataNode.java Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestFileAppend.java hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1164767&r1=1164766&r2=1164767&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Sat Sep 3 00:20:39 2011 @@ -80,6 +80,9 @@ Release 0.20.205.0 - unreleased HDFS-1057. Concurrent readers hit ChecksumExceptions if following a writer to very end of file (Sam Rash via dhruba) + HDFS-724. Use a bidirectional heartbeat to detect stuck + pipeline. (hairong) + IMPROVEMENTS MAPREDUCE-2187. Reporter sends progress during sort/merge. (Anupam Seth via Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1164767&r1=1164766&r2=1164767&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Sat Sep 3 00:20:39 2011 @@ -85,6 +85,7 @@ public class DFSClient implements FSCons private SocketFactory socketFactory; private int socketTimeout; private int datanodeWriteTimeout; + private int timeoutValue; // read timeout for the socket final int writePacketSize; private final FileSystem.Statistics stats; private int maxBlockAcquireFailures; @@ -194,6 +195,7 @@ public class DFSClient implements FSCons HdfsConstants.READ_TIMEOUT); this.datanodeWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout", HdfsConstants.WRITE_TIMEOUT); + this.timeoutValue = this.socketTimeout; this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class); // dfs.write.packet.size is an internal config variable this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024); @@ -2372,7 +2374,28 @@ public class DFSClient implements FSCons int dataPos; int checksumStart; int checksumPos; - + + private static final long HEART_BEAT_SEQNO = -1L; + + /** + * create a heartbeat packet + */ + Packet() { + this.lastPacketInBlock = false; + this.numChunks = 0; + this.offsetInBlock = 0; + this.seqno = HEART_BEAT_SEQNO; + + buffer = null; + int packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER; + buf = new byte[packetSize]; + + checksumStart = dataStart = packetSize; + checksumPos = checksumStart; + dataPos = dataStart; + maxChunks = 0; + } + // create a new packet Packet(int pktSize, int chunksPerPkt, long offsetInBlock) { this.lastPacketInBlock = false; @@ -2453,6 +2476,14 @@ public class DFSClient implements FSCons buffer.reset(); return buffer; } + + /** + * Check if this packet is a heart beat packet + * @return true if the sequence number is HEART_BEAT_SEQNO + */ + private boolean isHeartbeatPacket() { + return seqno == HEART_BEAT_SEQNO; + } } // @@ -2468,6 +2499,8 @@ public class DFSClient implements FSCons private volatile boolean closed = false; public void run() { + long lastPacket = 0; + while (!closed && clientRunning) { // if the Responder encountered an error, shutdown Responder @@ -2487,23 +2520,36 @@ public class DFSClient implements FSCons boolean doSleep = processDatanodeError(hasError, false); // wait for a packet to be sent. + long now = System.currentTimeMillis(); while ((!closed && !hasError && clientRunning - && dataQueue.size() == 0) || doSleep) { + && dataQueue.size() == 0 && + (blockStream == null || ( + blockStream != null && now - lastPacket < timeoutValue/2))) + || doSleep) { + long timeout = timeoutValue/2 - (now-lastPacket); + timeout = timeout <= 0 ? 1000 : timeout; + try { - dataQueue.wait(1000); + dataQueue.wait(timeout); + now = System.currentTimeMillis(); } catch (InterruptedException e) { } doSleep = false; } - if (closed || hasError || dataQueue.size() == 0 || !clientRunning) { + if (closed || hasError || !clientRunning) { continue; } try { // get packet to be sent. - one = dataQueue.getFirst(); + if (dataQueue.isEmpty()) { + one = new Packet(); // heartbeat packet + } else { + one = dataQueue.getFirst(); // regular data packet + } + long offsetInBlock = one.offsetInBlock; - + // get new block from namenode. if (blockStream == null) { LOG.debug("Allocating new block"); @@ -2525,12 +2571,14 @@ public class DFSClient implements FSCons ByteBuffer buf = one.getBuffer(); // move packet from dataQueue to ackQueue - dataQueue.removeFirst(); - dataQueue.notifyAll(); - synchronized (ackQueue) { - ackQueue.addLast(one); - ackQueue.notifyAll(); - } + if (!one.isHeartbeatPacket()) { + dataQueue.removeFirst(); + dataQueue.notifyAll(); + synchronized (ackQueue) { + ackQueue.addLast(one); + ackQueue.notifyAll(); + } + } // write out data to remote datanode blockStream.write(buf.array(), buf.position(), buf.remaining()); @@ -2539,6 +2587,8 @@ public class DFSClient implements FSCons blockStream.writeInt(0); // indicate end-of-block } blockStream.flush(); + lastPacket = System.currentTimeMillis(); + if (LOG.isDebugEnabled()) { LOG.debug("DataStreamer block " + block + " wrote packet seqno:" + one.seqno + @@ -2643,35 +2693,37 @@ public class DFSClient implements FSCons if (LOG.isDebugEnabled()) { LOG.debug("DFSClient for block " + block + " " + ack); } + + // processes response status from all datanodes. + for (int i = ack.getNumOfReplies()-1; i >=0 && clientRunning; i--) { + short reply = ack.getReply(i); + if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) { + errorIndex = i; // first bad datanode + throw new IOException("Bad response " + reply + + " for block " + block + + " from datanode " + + targets[i].getName()); + } + } + long seqno = ack.getSeqno(); - if (seqno == PipelineAck.HEART_BEAT.getSeqno()) { + assert seqno != PipelineAck.UNKOWN_SEQNO : + "Ack for unkown seqno should be a failed ack: " + ack; + if (seqno == Packet.HEART_BEAT_SEQNO) { // a heartbeat ack continue; - } else if (seqno == -2) { - // no nothing - } else { - Packet one = null; - synchronized (ackQueue) { - one = ackQueue.getFirst(); - } - if (one.seqno != seqno) { - throw new IOException("Responseprocessor: Expecting seqno " + - " for block " + block + " " + - one.seqno + " but received " + seqno); - } - lastPacketInBlock = one.lastPacketInBlock; } - // processes response status from all datanodes. - for (int i = ack.getNumOfReplies()-1; i >=0 && clientRunning; i--) { - short reply = ack.getReply(i); - if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) { - errorIndex = i; // first bad datanode - throw new IOException("Bad response " + reply + - " for block " + block + - " from datanode " + - targets[i].getName()); - } + Packet one = null; + synchronized (ackQueue) { + one = ackQueue.getFirst(); + } + + if (one.seqno != seqno) { + throw new IOException("Responseprocessor: Expecting seqno " + + " for block " + block + " " + + one.seqno + " but received " + seqno); } + lastPacketInBlock = one.lastPacketInBlock; synchronized (ackQueue) { ackQueue.removeFirst(); @@ -3090,7 +3142,7 @@ public class DFSClient implements FSCons LOG.debug("Connecting to " + nodes[0].getName()); InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName()); s = socketFactory.createSocket(); - int timeoutValue = 3000 * nodes.length + socketTimeout; + timeoutValue = 3000 * nodes.length + socketTimeout; NetUtils.connect(s, target, timeoutValue); s.setSoTimeout(timeoutValue); s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE); Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=1164767&r1=1164766&r2=1164767&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original) +++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Sat Sep 3 00:20:39 2011 @@ -36,11 +36,10 @@ public interface DataTransferProtocol { * when protocol changes. It is not very obvious. */ /* - * Version 18: - * Change the block packet ack protocol to include seqno, - * numberOfReplies, reply0, reply1, ... + * Version 19: + * A heartbeat is sent from the client to pipeline and then acked back */ - public static final int DATA_TRANSFER_VERSION = 17; + public static final int DATA_TRANSFER_VERSION = 19; // Processed at datanode stream-handler public static final byte OP_WRITE_BLOCK = (byte) 80; @@ -65,7 +64,7 @@ public interface DataTransferProtocol { public static class PipelineAck implements Writable { private long seqno; private short replies[]; - final public static PipelineAck HEART_BEAT = new PipelineAck(-1, new short[0]); + final public static long UNKOWN_SEQNO = -2; /** default constructor **/ public PipelineAck() { Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1164767&r1=1164766&r2=1164767&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original) +++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Sat Sep 3 00:20:39 2011 @@ -471,6 +471,12 @@ class BlockReceiver implements java.io.C checksumOut.write(pktBuf, checksumOff, checksumLen); } datanode.myMetrics.incrBytesWritten(len); + + /// flush entire packet before sending ack + flush(); + + // update length only after flush to disk + datanode.data.setVisibleLength(block, offsetInBlock); } } catch (IOException iex) { datanode.checkDiskError(iex); @@ -478,12 +484,6 @@ class BlockReceiver implements java.io.C } } - /// flush entire packet before sending ack - flush(); - - // update length only after flush to disk - datanode.data.setVisibleLength(block, offsetInBlock); - // put in queue for pending acks if (responder != null) { ((PacketResponder)responder.getRunnable()).enqueue(seqno, @@ -757,130 +757,51 @@ class BlockReceiver implements java.io.C notifyAll(); } - private synchronized void lastDataNodeRun() { - long lastHeartbeat = System.currentTimeMillis(); - boolean lastPacket = false; - final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; - - while (running && datanode.shouldRun && !lastPacket) { - long now = System.currentTimeMillis(); - try { - - // wait for a packet to be sent to downstream datanode - while (running && datanode.shouldRun && ackQueue.size() == 0) { - long idle = now - lastHeartbeat; - long timeout = (datanode.socketTimeout/2) - idle; - if (timeout <= 0) { - timeout = 1000; - } - try { - wait(timeout); - } catch (InterruptedException e) { - if (running) { - LOG.info("PacketResponder " + numTargets + - " for block " + block + " Interrupted."); - running = false; - } - break; - } - - // send a heartbeat if it is time. - now = System.currentTimeMillis(); - if (now - lastHeartbeat > datanode.socketTimeout/2) { - PipelineAck.HEART_BEAT.write(replyOut); // send heart beat - replyOut.flush(); - if (LOG.isDebugEnabled()) { - LOG.debug("PacketResponder " + numTargets + - " for block " + block + - " sent a heartbeat"); - } - lastHeartbeat = now; - } - } - - if (!running || !datanode.shouldRun) { - break; - } - Packet pkt = ackQueue.removeFirst(); - long expected = pkt.seqno; - notifyAll(); - LOG.debug("PacketResponder " + numTargets + - " for block " + block + - " acking for packet " + expected); - - // If this is the last packet in block, then close block - // file and finalize the block before responding success - if (pkt.lastPacketInBlock) { - if (!receiver.finalized) { - receiver.close(); - final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; - block.setNumBytes(receiver.offsetInBlock); - datanode.data.finalizeBlock(block); - datanode.myMetrics.incrBlocksWritten(); - datanode.notifyNamenodeReceivedBlock(block, - DataNode.EMPTY_DEL_HINT); - if (ClientTraceLog.isInfoEnabled() && - receiver.clientName.length() > 0) { - long offset = 0; - ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT, - receiver.inAddr, receiver.myAddr, block.getNumBytes(), - "HDFS_WRITE", receiver.clientName, offset, - datanode.dnRegistration.getStorageID(), block, endTime-startTime)); - } else { - LOG.info("Received block " + block + - " of size " + block.getNumBytes() + - " from " + receiver.inAddr); - } - } - lastPacket = true; - } - - new PipelineAck(expected, new short[]{ - DataTransferProtocol.OP_STATUS_SUCCESS}).write(replyOut); - replyOut.flush(); - } catch (Exception e) { - LOG.warn("IOException in BlockReceiver.lastNodeRun: ", e); - if (running) { - try { - datanode.checkDiskError(e); // may throw an exception here - } catch (IOException ioe) { - LOG.warn("DataNode.chekDiskError failed in lastDataNodeRun with: ", - ioe); - } - LOG.info("PacketResponder " + block + " " + numTargets + - " Exception " + StringUtils.stringifyException(e)); - running = false; - } - } - } - LOG.info("PacketResponder " + numTargets + - " for block " + block + " terminating"); - } - /** * Thread to process incoming acks. * @see java.lang.Runnable#run() */ public void run() { - - // If this is the last datanode in pipeline, then handle differently - if (numTargets == 0) { - lastDataNodeRun(); - return; - } - boolean lastPacketInBlock = false; boolean isInterrupted = false; final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; while (running && datanode.shouldRun && !lastPacketInBlock) { try { - long expected = -2; - PipelineAck ack = new PipelineAck(); - long seqno = -2; - boolean localMirrorError = mirrorError; - try { - if (!localMirrorError) { + /** + * Sequence number -2 is a special value that is used when + * a DN fails to read an ack from a downstream. In this case, + * it needs to tell the client that there's been an error downstream + * but has no valid sequence number to use. Thus, -2 is used + * as an UNKNOWN value. + */ + long expected = PipelineAck.UNKOWN_SEQNO; + long seqno = PipelineAck.UNKOWN_SEQNO;; + + PipelineAck ack = new PipelineAck(); + boolean localMirrorError = mirrorError; + try { + Packet pkt = null; + synchronized (this) { + // wait for a packet to arrive + while (running && datanode.shouldRun && ackQueue.size() == 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("PacketResponder " + numTargets + + " seqno = " + seqno + + " for block " + block + + " waiting for local datanode to finish write."); + } + wait(); + } + if (!running || !datanode.shouldRun) { + break; + } + pkt = ackQueue.removeFirst(); + expected = pkt.seqno; + notifyAll(); + } + // receive an ack if DN is not the last one in the pipeline + if (numTargets > 0 && !localMirrorError) { // read an ack from downstream datanode ack.readFields(mirrorIn); if (LOG.isDebugEnabled()) { @@ -888,34 +809,15 @@ class BlockReceiver implements java.io.C " for block " + block + " got " + ack); } seqno = ack.getSeqno(); - } - if (seqno >= 0 || localMirrorError) { - Packet pkt = null; - synchronized (this) { - while (running && datanode.shouldRun && ackQueue.size() == 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("PacketResponder " + numTargets + - " seqno = " + seqno + - " for block " + block + - " waiting for local datanode to finish write."); - } - wait(); - } - if (!running || !datanode.shouldRun) { - break; - } - pkt = ackQueue.removeFirst(); - expected = pkt.seqno; - notifyAll(); - if (seqno != expected && !localMirrorError) { - throw new IOException("PacketResponder " + numTargets + - " for block " + block + - " expected seqno:" + expected + - " received:" + seqno); - } - lastPacketInBlock = pkt.lastPacketInBlock; + // verify seqno + if (seqno != expected) { + throw new IOException("PacketResponder " + numTargets + + " for block " + block + + " expected seqno:" + expected + + " received:" + seqno); } } + lastPacketInBlock = pkt.lastPacketInBlock; } catch (InterruptedException ine) { isInterrupted = true; } catch (IOException ioe) { @@ -968,25 +870,21 @@ class BlockReceiver implements java.io.C } } - PipelineAck replyAck; - if (seqno == PipelineAck.HEART_BEAT.getSeqno()) { - replyAck = ack; // continue to send keep alive + // construct my ack message + short[] replies = null; + if (mirrorError) { // no ack is read + replies = new short[2]; + replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS; + replies[1] = DataTransferProtocol.OP_STATUS_ERROR; } else { - // construct my ack message - short[] replies = null; - if (mirrorError) { // no ack is read - replies = new short[2]; - replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS; - replies[1] = DataTransferProtocol.OP_STATUS_ERROR; - } else { - replies = new short[1+ack.getNumOfReplies()]; - replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS; - for (int i=0; i