Return-Path: Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: (qmail 2879 invoked from network); 29 Sep 2009 18:37:52 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 29 Sep 2009 18:37:52 -0000 Received: (qmail 74411 invoked by uid 500); 29 Sep 2009 18:37:52 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 74373 invoked by uid 500); 29 Sep 2009 18:37:52 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 74363 invoked by uid 99); 29 Sep 2009 18:37:52 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 Sep 2009 18:37:52 +0000 X-ASF-Spam-Status: No, hits=-1996.0 required=10.0 tests=ALL_TRUSTED,FB_REPLIC_CAP X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 Sep 2009 18:37:42 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 6CC7023888D6; Tue, 29 Sep 2009 18:37:22 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r820062 - in /hadoop/hdfs/branches/HDFS-265/src: java/org/apache/hadoop/hdfs/ java/org/apache/hadoop/hdfs/server/datanode/ test/hdfs/org/apache/hadoop/hdfs/ test/hdfs/org/apache/hadoop/hdfs/server/datanode/ Date: Tue, 29 Sep 2009 18:37:22 -0000 To: hdfs-commits@hadoop.apache.org From: hairong@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090929183722.6CC7023888D6@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: hairong Date: Tue Sep 29 18:37:21 2009 New Revision: 820062 URL: http://svn.apache.org/viewvc?rev=820062&view=rev Log: HDFS-642. Support pipeline close and close error recovery. Contributed by Hairong Kuang. Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=820062&r1=820061&r2=820062&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue Sep 29 18:37:21 2009 @@ -1431,8 +1431,8 @@ int dataLen = in.readInt(); // Sanity check the lengths - if ( dataLen < 0 || - ( (dataLen % bytesPerChecksum) != 0 && !lastPacketInBlock ) || + if ( ( dataLen <= 0 && !lastPacketInBlock ) || + ( dataLen != 0 && lastPacketInBlock) || (seqno != (lastSeqNo + 1)) ) { throw new IOException("BlockReader: error in packet header" + "(chunkOffset : " + chunkOffset + @@ -2598,7 +2598,16 @@ response.start(); stage = BlockConstructionStage.DATA_STREAMING; } - + + private void endBlock() { + LOG.debug("Closing old block " + block); + this.setName("DataStreamer for file " + src); + closeResponder(); + closeStream(); + nodes = null; + stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; + } + /* * streamer thread is the only thread that opens streams to datanode, * and closes them. Any error recovery is also done by this thread. @@ -2642,8 +2651,6 @@ one = dataQueue.getFirst(); } - long offsetInBlock = one.offsetInBlock; - // get new block from namenode. if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { LOG.debug("Allocating new block"); @@ -2655,14 +2662,34 @@ initDataStreaming(); } - if (offsetInBlock >= blockSize) { + long lastByteOffsetInBlock = one.getLastByteOffsetBlock(); + if (lastByteOffsetInBlock > blockSize) { throw new IOException("BlockSize " + blockSize + " is smaller than data size. " + " Offset of packet in block " + - offsetInBlock + + lastByteOffsetInBlock + " Aborting file " + src); } + if (one.lastPacketInBlock) { + // wait for all data packets have been successfully acked + synchronized (dataQueue) { + while (!streamerClosed && !hasError && + ackQueue.size() != 0 && clientRunning) { + try { + // wait for acks to arrive from datanodes + dataQueue.wait(1000); + } catch (InterruptedException e) { + } + } + } + if (streamerClosed || hasError || !clientRunning) { + continue; + } + stage = BlockConstructionStage.PIPELINE_CLOSE; + } + + // send the packet ByteBuffer buf = one.getBuffer(); synchronized (dataQueue) { @@ -2674,11 +2701,7 @@ if (LOG.isDebugEnabled()) { LOG.debug("DataStreamer block " + block + - " sending packet seqno:" + one.seqno + - " size:" + buf.remaining() + - " offsetInBlock:" + one.offsetInBlock + - " lastPacketInBlock:" + one.lastPacketInBlock + - " lastByteOffsetInBlock" + one.getLastByteOffsetBlock()); + " sending packet " + one); } // write out data to remote datanode @@ -2690,22 +2713,31 @@ if (bytesSent < tmpBytesSent) { bytesSent = tmpBytesSent; } - + + if (streamerClosed || hasError || !clientRunning) { + continue; + } + + // Is this block full? if (one.lastPacketInBlock) { + // wait for the close packet has been acked synchronized (dataQueue) { - while (!streamerClosed && !hasError && ackQueue.size() != 0 && clientRunning) { - try { - dataQueue.wait(1000); // wait for acks to arrive from datanodes - } catch (InterruptedException e) { - } + while (!streamerClosed && !hasError && + ackQueue.size() != 0 && clientRunning) { + dataQueue.wait(1000);// wait for acks to arrive from datanodes } } - - if (ackQueue.isEmpty()) { // done receiving all acks - // indicate end-of-block - blockStream.writeInt(0); - blockStream.flush(); + if (streamerClosed || hasError || !clientRunning) { + continue; } + + endBlock(); + } + if (progress != null) { progress.progress(); } + + // This is used by unit test to trigger race conditions. + if (artificialSlowdown != 0 && clientRunning) { + Thread.sleep(artificialSlowdown); } } catch (Throwable e) { LOG.warn("DataStreamer Exception: " + @@ -2718,29 +2750,6 @@ streamerClosed = true; } } - - - if (streamerClosed || hasError || !clientRunning) { - continue; - } - - // Is this block full? - if (one.lastPacketInBlock) { - LOG.debug("Closing old block " + block); - this.setName("DataStreamer for file " + src); - closeResponder(); - closeStream(); - nodes = null; - stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; - } - if (progress != null) { progress.progress(); } - - // This is used by unit test to trigger race conditions. - if (artificialSlowdown != 0 && clientRunning) { - try { - Thread.sleep(artificialSlowdown); - } catch (InterruptedException e) {} - } } closeInternal(); } @@ -2928,7 +2937,15 @@ boolean doSleep = setupPipelineForAppendOrRecovery(); if (!streamerClosed && clientRunning) { - initDataStreaming(); + if (stage == BlockConstructionStage.PIPELINE_CLOSE) { + synchronized (dataQueue) { + dataQueue.remove(); // remove the end of block packet + dataQueue.notifyAll(); + } + endBlock(); + } else { + initDataStreaming(); + } } return doSleep; @@ -3392,15 +3409,6 @@ ", blockSize=" + blockSize + ", appendChunk=" + appendChunk); } - // - // if we allocated a new packet because we encountered a block - // boundary, reset bytesCurBlock. - // - if (bytesCurBlock == blockSize) { - currentPacket.lastPacketInBlock = true; - bytesCurBlock = 0; - lastFlushOffset = -1; - } waitAndQueuePacket(currentPacket); currentPacket = null; @@ -3413,6 +3421,20 @@ } int psize = Math.min((int)(blockSize-bytesCurBlock), writePacketSize); computePacketChunkSize(psize, bytesPerChecksum); + + // + // if encountering a block boundary, send an empty packet to + // indicate the end of block and reset bytesCurBlock. + // + if (bytesCurBlock == blockSize) { + currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0, + bytesCurBlock); + currentPacket.lastPacketInBlock = true; + waitAndQueuePacket(currentPacket); + currentPacket = null; + bytesCurBlock = 0; + lastFlushOffset = -1; + } } } @@ -3556,21 +3578,22 @@ try { flushBuffer(); // flush from all upper layers - // Mark that this packet is the last packet in block. - // If there are no outstanding packets and the last packet - // was not the last one in the current block, then create a - // packet with empty payload. - if (currentPacket == null && bytesCurBlock != 0) { - currentPacket = new Packet(packetSize, chunksPerPacket, - bytesCurBlock); - } if (currentPacket != null) { + waitAndQueuePacket(currentPacket); + } + + if (bytesCurBlock != 0) { + // send an empty packet to mark the end of the block + currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0, + bytesCurBlock); currentPacket.lastPacketInBlock = true; } flushInternal(); // flush all data to Datanodes + LOG.info("Done flushing"); // get last block before destroying the streamer Block lastBlock = streamer.getBlock(); + LOG.info("Closing the streams..."); closeThreads(false); completeFile(lastBlock); leasechecker.remove(src); Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=820062&r1=820061&r2=820062&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original) +++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Sep 29 18:37:21 2009 @@ -102,14 +102,8 @@ replicaInfo = datanode.data.createRbw(block); break; case PIPELINE_SETUP_STREAMING_RECOVERY: - if (datanode.data.isValidBlock(block)) { - // pipeline failed after the replica is finalized. This will be - // handled differently when pipeline close/recovery is introduced - replicaInfo = datanode.data.append(block, newGs, maxBytesRcvd); - } else { - replicaInfo = datanode.data.recoverRbw( - block, newGs, minBytesRcvd, maxBytesRcvd); - } + replicaInfo = datanode.data.recoverRbw( + block, newGs, minBytesRcvd, maxBytesRcvd); block.setGenerationStamp(newGs); break; case PIPELINE_SETUP_APPEND: @@ -330,7 +324,7 @@ * It tries to read a full packet with single read call. * Consecutive packets are usually of the same length. */ - private int readNextPacket() throws IOException { + private void readNextPacket() throws IOException { /* This dances around buf a little bit, mainly to read * full packet with single read and to accept arbitarary size * for next packet at the same time. @@ -366,12 +360,6 @@ int payloadLen = buf.getInt(); buf.reset(); - if (payloadLen == 0) { - //end of stream! - buf.limit(buf.position() + SIZE_OF_INTEGER); - return 0; - } - // check corrupt values for pktLen, 100MB upper limit should be ok? if (payloadLen < 0 || payloadLen > (100*1024*1024)) { throw new IOException("Incorrect value for packet payload : " + @@ -411,21 +399,15 @@ if (pktSize > maxPacketReadLen) { maxPacketReadLen = pktSize; } - - return payloadLen; } /** * Receives and processes a packet. It can contain many chunks. - * returns size of the packet. + * returns the number of data bytes that the packet has. */ private int receivePacket() throws IOException { - - int payloadLen = readNextPacket(); - - if (payloadLen <= 0) { - return payloadLen; - } + // read the next packet + readNextPacket(); buf.mark(); //read the header @@ -451,7 +433,7 @@ if (LOG.isDebugEnabled()){ LOG.debug("Receiving one packet for block " + block + - " of length " + payloadLen + + " of length " + len + " seqno " + seqno + " offsetInBlock " + offsetInBlock + " lastPacketInBlock " + lastPacketInBlock); @@ -462,6 +444,12 @@ if (replicaInfo.getNumBytes() < offsetInBlock) { replicaInfo.setNumBytes(offsetInBlock); } + + // put in queue for pending acks + if (responder != null) { + ((PacketResponder)responder.getRunnable()).enqueue(seqno, + lastPacketInBlock, offsetInBlock); + } //First write the packet to the mirror: if (mirrorOut != null) { @@ -475,8 +463,8 @@ buf.position(endOfHeader); - if (len == 0) { - LOG.debug("Receiving empty packet for block " + block); + if (lastPacketInBlock || len == 0) { + LOG.debug("Receiving an empty packet or the end of the block " + block); } else { int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)* checksumSize; @@ -539,17 +527,11 @@ /// flush entire packet before sending ack flush(); - // put in queue for pending acks - if (responder != null) { - ((PacketResponder)responder.getRunnable()).enqueue(seqno, - lastPacketInBlock, offsetInBlock); - } - if (throttler != null) { // throttle I/O - throttler.throttle(payloadLen); + throttler.throttle(len); } - return payloadLen; + return len; } void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException { @@ -578,20 +560,10 @@ } /* - * Receive until packet length is zero. + * Receive until packet has zero bytes of data. */ while (receivePacket() > 0) {} - // flush the mirror out - if (mirrorOut != null) { - try { - mirrorOut.writeInt(0); // mark the end of the block - mirrorOut.flush(); - } catch (IOException e) { - handleMirrorOutError(e); - } - } - // wait for all outstanding packet responses. And then // indicate responder to gracefully shutdown. // Mark that responder has been closed for future processing @@ -846,9 +818,7 @@ final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; block.setNumBytes(replicaInfo.getNumBytes()); datanode.data.finalizeBlock(block); - datanode.myMetrics.blocksWritten.inc(); - datanode.notifyNamenodeReceivedBlock(block, - DataNode.EMPTY_DEL_HINT); + datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT); if (ClientTraceLog.isInfoEnabled() && receiver.clientName.length() > 0) { long offset = 0; @@ -987,9 +957,7 @@ final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; block.setNumBytes(replicaInfo.getNumBytes()); datanode.data.finalizeBlock(block); - datanode.myMetrics.blocksWritten.inc(); - datanode.notifyNamenodeReceivedBlock(block, - DataNode.EMPTY_DEL_HINT); + datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT); if (ClientTraceLog.isInfoEnabled() && receiver.clientName.length() > 0) { long offset = 0; Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=820062&r1=820061&r2=820062&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original) +++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Tue Sep 29 18:37:21 2009 @@ -270,10 +270,6 @@ int len = Math.min((int) (endOffset - offset), bytesPerChecksum*maxChunks); - if (len == 0) { - return 0; - } - int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum; int packetLen = len + numChunks*checksumSize + 4; pkt.clear(); @@ -282,7 +278,7 @@ pkt.putInt(packetLen); pkt.putLong(offset); pkt.putLong(seqno); - pkt.put((byte)((offset + len >= endOffset) ? 1 : 0)); + pkt.put((byte)((len == 0) ? 1 : 0)); //why no ByteBuf.putBoolean()? pkt.putInt(len); @@ -443,7 +439,8 @@ seqno++; } try { - out.writeInt(0); // mark the end of block + // send an empty packet to mark the end of the block + sendChunks(pktBuf, maxChunksPerPacket, streamForSendChunks); out.flush(); } catch (IOException e) { //socket error throw ioeToSocketException(e); Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=820062&r1=820061&r2=820062&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Sep 29 18:37:21 2009 @@ -1275,6 +1275,20 @@ } } } + + /** + * After a block becomes finalized, a datanode increases metric counter, + * notifies namenode, and adds it to the block scanner + * @param block + * @param delHint + */ + void closeBlock(Block block, String delHint) { + myMetrics.blocksWritten.inc(); + notifyNamenodeReceivedBlock(block, delHint); + if (blockScanner != null) { + blockScanner.addBlock(block); + } + } /** * No matter what kind of exception we get, keep retrying to offerService(). Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=820062&r1=820061&r2=820062&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original) +++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue Sep 29 18:37:21 2009 @@ -252,12 +252,17 @@ String firstBadLink = ""; // first datanode that failed in connection setup DataTransferProtocol.Status mirrorInStatus = SUCCESS; try { - // open a block receiver and check if the block does not exist - blockReceiver = new BlockReceiver(block, in, - s.getRemoteSocketAddress().toString(), - s.getLocalSocketAddress().toString(), - stage, newGs, minBytesRcvd, maxBytesRcvd, - client, srcDataNode, datanode); + if (client.length() == 0 || + stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { + // open a block receiver + blockReceiver = new BlockReceiver(block, in, + s.getRemoteSocketAddress().toString(), + s.getLocalSocketAddress().toString(), + stage, newGs, minBytesRcvd, maxBytesRcvd, + client, srcDataNode, datanode); + } else { + datanode.data.recoverClose(block, newGs, minBytesRcvd); + } // // Open network conn to backup machine, if @@ -289,7 +294,9 @@ pipelineSize, stage, newGs, minBytesRcvd, maxBytesRcvd, client, srcDataNode, targets, accessToken); - blockReceiver.writeChecksumHeader(mirrorOut); + if (blockReceiver != null) { // send checksum header + blockReceiver.writeChecksumHeader(mirrorOut); + } mirrorOut.flush(); // read connect ack (only for clients, not for replication req) @@ -340,24 +347,31 @@ } // receive the block and mirror to the next target - String mirrorAddr = (mirrorSock == null) ? null : mirrorNode; - blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, - mirrorAddr, null, targets.length); + if (blockReceiver != null) { + String mirrorAddr = (mirrorSock == null) ? null : mirrorNode; + blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, + mirrorAddr, null, targets.length); + } - // if this write is for a replication request (and not - // from a client), then confirm block. For client-writes, + // update its generation stamp + if (client.length() != 0 && + stage != BlockConstructionStage.PIPELINE_SETUP_CREATE) { + block.setGenerationStamp(newGs); + block.setNumBytes(minBytesRcvd); + } + + // if this write is for a replication request or recovering + // a failed close for client, then confirm block. For other client-writes, // the block is finalized in the PacketResponder. - if (client.length() == 0) { - datanode.notifyNamenodeReceivedBlock(block, DataNode.EMPTY_DEL_HINT); + if (client.length() == 0 || + stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { + datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT); LOG.info("Received block " + block + " src: " + remoteAddress + " dest: " + localAddress + " of size " + block.getNumBytes()); } - if (datanode.blockScanner != null) { - datanode.blockScanner.addBlock(block); - } } catch (IOException ioe) { LOG.info("writeBlock " + block + " received exception " + ioe); Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=820062&r1=820061&r2=820062&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original) +++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Tue Sep 29 18:37:21 2009 @@ -1175,17 +1175,21 @@ return newReplicaInfo; } - @Override // FSDatasetInterface - public synchronized ReplicaInPipelineInterface recoverAppend(Block b, - long newGS, long expectedBlockLen) throws IOException { - DataNode.LOG.info("Recover failed append to " + b); - + private ReplicaInfo recoverCheck(Block b, long newGS, + long expectedBlockLen) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId()); if (replicaInfo == null) { throw new ReplicaNotFoundException( ReplicaNotFoundException.NON_EXISTENT_REPLICA + b); } + // check state + if (replicaInfo.getState() != ReplicaState.FINALIZED && + replicaInfo.getState() != ReplicaState.RBW) { + throw new ReplicaNotFoundException( + ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA + replicaInfo); + } + // check generation stamp long replicaGenerationStamp = replicaInfo.getGenerationStamp(); if (replicaGenerationStamp < b.getGenerationStamp() || @@ -1219,20 +1223,39 @@ " with a length of " + replicaLen + " expected length is " + expectedBlockLen); } + + return replicaInfo; + } + @Override // FSDatasetInterface + public synchronized ReplicaInPipelineInterface recoverAppend(Block b, + long newGS, long expectedBlockLen) throws IOException { + DataNode.LOG.info("Recover failed append to " + b); + + ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); // change the replica's state/gs etc. - switch (replicaInfo.getState()) { - case FINALIZED: + if (replicaInfo.getState() == ReplicaState.FINALIZED ) { return append((FinalizedReplica)replicaInfo, newGS, b.getNumBytes()); - case RBW: + } else { //RBW bumpReplicaGS(replicaInfo, newGS); return (ReplicaBeingWritten)replicaInfo; - default: - throw new ReplicaNotFoundException( - ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA + replicaInfo); } } + @Override + public void recoverClose(Block b, long newGS, + long expectedBlockLen) throws IOException { + DataNode.LOG.info("Recover failed close " + b); + // check replica's state + ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); + // bump the replica's GS + bumpReplicaGS(replicaInfo, newGS); + // finalize the replica if RBW + if (replicaInfo.getState() == ReplicaState.RBW) { + finalizeBlock(replicaInfo); + } + } + /** * Bump a replica's generation stamp to a new one. * Its on-disk meta file name is renamed to be the new one too. Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=820062&r1=820061&r2=820062&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original) +++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Tue Sep 29 18:37:21 2009 @@ -240,6 +240,18 @@ long newGS, long expectedBlockLen) throws IOException; /** + * Recover a failed pipeline close + * It bumps the replica's generation stamp and finalize it if RBW replica + * + * @param b block + * @param newGS the new generation stamp for the replica + * @param expectedBlockLen the number of bytes the replica is expected to have + * @throws IOException + */ + public void recoverClose(Block b, + long newGS, long expectedBlockLen) throws IOException; + + /** * Update the block to the new generation stamp and length. */ public void updateBlock(Block oldblock, Block newblock) throws IOException; Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java?rev=820062&r1=820061&r2=820062&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java (original) +++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java Tue Sep 29 18:37:21 2009 @@ -33,7 +33,7 @@ final static String UNFINALIZED_REPLICA = "Cannot append to an unfinalized replica "; final static String UNFINALIZED_AND_NONRBW_REPLICA = - "Cannot recover appending to a replica that's not FINALIZED and not RBW "; + "Cannot recover append/close to a replica that's not FINALIZED and not RBW "; final static String NON_EXISTENT_REPLICA = "Cannot append to a non-existent replica "; final static String UNEXPECTED_GS_REPLICA = Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=820062&r1=820061&r2=820062&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original) +++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Tue Sep 29 18:37:21 2009 @@ -46,7 +46,6 @@ import org.apache.hadoop.hdfs.protocol.DataTransferProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType; import org.apache.hadoop.hdfs.server.common.HdfsConstants; @@ -154,10 +153,7 @@ sendOut.writeInt(0); // chunk length sendOut.writeInt(0); // zero checksum - - // mark the end of block - sendOut.writeInt(0); - + //ok finally write a block with 0 len SUCCESS.write(recvOut); Text.writeString(recvOut, ""); // first bad node @@ -177,6 +173,11 @@ if (eofExcepted) { ERROR.write(recvOut); sendRecvData(description, true); + } else if (stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { + //ok finally write a block with 0 len + SUCCESS.write(recvOut); + Text.writeString(recvOut, ""); // first bad node + sendRecvData(description, false); } else { writeZeroLengthPacket(block, description); } @@ -208,8 +209,7 @@ long newGS = firstBlock.getGenerationStamp() + 1; testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY, - newGS, "Successful for now", false); - firstBlock.setGenerationStamp(newGS); + newGS, "Cannot recover data streaming to a finalized replica", true); // test PIPELINE_SETUP_APPEND on an existing block newGS = firstBlock.getGenerationStamp() + 1; testWrite(firstBlock, @@ -217,10 +217,21 @@ newGS, "Append to a finalized replica", false); firstBlock.setGenerationStamp(newGS); // test PIPELINE_SETUP_APPEND_RECOVERY on an existing block + file = new Path("dataprotocol1.dat"); + DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L); + firstBlock = DFSTestUtil.getFirstBlock(fileSys, file); newGS = firstBlock.getGenerationStamp() + 1; testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY, newGS, "Recover appending to a finalized replica", false); + // test PIPELINE_CLOSE_RECOVERY on an existing block + file = new Path("dataprotocol2.dat"); + DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L); + firstBlock = DFSTestUtil.getFirstBlock(fileSys, file); + newGS = firstBlock.getGenerationStamp() + 1; + testWrite(firstBlock, + BlockConstructionStage.PIPELINE_CLOSE_RECOVERY, newGS, + "Recover failed close to a finalized replica", false); firstBlock.setGenerationStamp(newGS); /* Test writing to a new block */ @@ -276,11 +287,19 @@ newGS, "Recover append to a RBW replica", false); firstBlock.setGenerationStamp(newGS); // test PIPELINE_SETUP_STREAMING_RECOVERY on a RBW block + file = new Path("dataprotocol2.dat"); + DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L); + out = (DFSOutputStream)(fileSys.append(file). + getWrappedStream()); + out.write(1); + out.hflush(); + in = fileSys.open(file); + firstBlock = DFSTestUtil.getAllBlocks(in).get(0).getBlock(); + firstBlock.setNumBytes(2L); newGS = firstBlock.getGenerationStamp() + 1; testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY, newGS, "Recover a RBW replica", false); - firstBlock.setGenerationStamp(newGS); } finally { IOUtils.closeStream(in); IOUtils.closeStream(out); Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java?rev=820062&r1=820061&r2=820062&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java (original) +++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java Tue Sep 29 18:37:21 2009 @@ -36,7 +36,7 @@ /** This class implements some of tests posted in HADOOP-2658. */ public class TestFileAppend3 extends junit.framework.TestCase { - static final long BLOCK_SIZE = 3 * 64 * 1024; + static final long BLOCK_SIZE = 64 * 1024; static final short REPLICATION = 3; static final int DATANODE_NUM = 5; Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=820062&r1=820061&r2=820062&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original) +++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Tue Sep 29 18:37:21 2009 @@ -484,18 +484,36 @@ public synchronized ReplicaInPipelineInterface recoverAppend(Block b, long newGS, long expectedBlockLen) throws IOException { BInfo binfo = blockMap.get(b); - if (binfo == null || !binfo.isFinalized()) { + if (binfo == null) { throw new ReplicaNotFoundException("Block " + b + " is not valid, and cannot be appended to."); } if (binfo.isFinalized()) { binfo.unfinalizeBlock(); } + blockMap.remove(b); binfo.theBlock.setGenerationStamp(newGS); + blockMap.put(binfo.theBlock, binfo); return binfo; } @Override + public void recoverClose(Block b, long newGS, + long expectedBlockLen) throws IOException { + BInfo binfo = blockMap.get(b); + if (binfo == null) { + throw new ReplicaNotFoundException("Block " + b + + " is not valid, and cannot be appended to."); + } + if (!binfo.isFinalized()) { + binfo.finalizeBlock(binfo.getNumBytes()); + } + blockMap.remove(b); + binfo.theBlock.setGenerationStamp(newGS); + blockMap.put(binfo.theBlock, binfo); + } + + @Override public synchronized ReplicaInPipelineInterface recoverRbw(Block b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException { BInfo binfo = blockMap.get(b); @@ -507,7 +525,9 @@ throw new ReplicaAlreadyExistsException("Block " + b + " is valid, and cannot be written to."); } + blockMap.remove(b); binfo.theBlock.setGenerationStamp(newGS); + blockMap.put(binfo.theBlock, binfo); return binfo; } Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java?rev=820062&r1=820061&r2=820062&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java (original) +++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Tue Sep 29 18:37:21 2009 @@ -32,8 +32,11 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.DataTransferProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage; +import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Sender; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.AccessToken; @@ -114,17 +117,12 @@ DataOutputStream out = new DataOutputStream( s.getOutputStream()); - out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION ); - WRITE_BLOCK.write(out); - out.writeLong( block.getBlock().getBlockId()); - out.writeLong( block.getBlock().getGenerationStamp() ); - out.writeInt(1); - out.writeBoolean( false ); // recovery flag - Text.writeString( out, "" ); - out.writeBoolean(false); // Not sending src node information - out.writeInt(0); - AccessToken.DUMMY_TOKEN.write(out); - + Sender.opWriteBlock(out, block.getBlock().getBlockId(), + block.getBlock().getGenerationStamp(), 1, + BlockConstructionStage.PIPELINE_SETUP_CREATE, + 0L, 0L, 0L, "", null, new DatanodeInfo[0], + AccessToken.DUMMY_TOKEN); + // write check header out.writeByte( 1 ); out.writeInt( 512 ); Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java?rev=820062&r1=820061&r2=820062&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java (original) +++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java Tue Sep 29 18:37:21 2009 @@ -41,6 +41,25 @@ final private static int RUR = 4; final private static int NON_EXISTENT = 5; + // test close + @Test + public void testClose() throws Exception { + MiniDFSCluster cluster = new MiniDFSCluster(new Configuration(), 1, true, null); + try { + cluster.waitActive(); + DataNode dn = cluster.getDataNodes().get(0); + FSDataset dataSet = (FSDataset)dn.data; + + // set up replicasMap + setup(dataSet); + + // test close + testClose(dataSet); + } finally { + cluster.shutdown(); + } + } + // test append @Test public void testAppend() throws Exception { @@ -79,7 +98,7 @@ } } - // test writeToRbw + // test writeToTemporary @Test public void testWriteToTempoary() throws Exception { MiniDFSCluster cluster = new MiniDFSCluster(new Configuration(), 1, true, null); @@ -113,9 +132,8 @@ blocks[TEMPORARY].getGenerationStamp(), vol, vol.createTmpFile(blocks[TEMPORARY]).getParentFile())); - replicaInfo = new ReplicaBeingWritten(blocks[RBW].getBlockId(), - blocks[RBW].getGenerationStamp(), vol, - vol.createRbwFile(blocks[RBW]).getParentFile()); + replicaInfo = new ReplicaBeingWritten(blocks[RBW], vol, + vol.createRbwFile(blocks[RBW]).getParentFile(), null); replicasMap.add(replicaInfo); replicaInfo.getBlockFile().createNewFile(); replicaInfo.getMetaFile().createNewFile(); @@ -127,8 +145,10 @@ } private void testAppend(FSDataset dataSet) throws IOException { - dataSet.append(blocks[FINALIZED], blocks[FINALIZED].getGenerationStamp()+1, + long newGS = blocks[FINALIZED].getGenerationStamp()+1; + dataSet.append(blocks[FINALIZED], newGS, blocks[FINALIZED].getNumBytes()); // successful + blocks[FINALIZED].setGenerationStamp(newGS); try { dataSet.append(blocks[TEMPORARY], blocks[TEMPORARY].getGenerationStamp()+1, @@ -177,8 +197,106 @@ Assert.assertEquals(ReplicaNotFoundException.NON_EXISTENT_REPLICA + blocks[NON_EXISTENT], e.getMessage()); } + + newGS = blocks[FINALIZED].getGenerationStamp()+1; + dataSet.recoverAppend(blocks[FINALIZED], newGS, + blocks[FINALIZED].getNumBytes()); // successful + blocks[FINALIZED].setGenerationStamp(newGS); + + try { + dataSet.recoverAppend(blocks[TEMPORARY], blocks[TEMPORARY].getGenerationStamp()+1, + blocks[TEMPORARY].getNumBytes()); + Assert.fail("Should not have appended to a temporary replica " + + blocks[TEMPORARY]); + } catch (ReplicaNotFoundException e) { + Assert.assertTrue(e.getMessage().startsWith( + ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA)); + } + + newGS = blocks[RBW].getGenerationStamp()+1; + dataSet.recoverAppend(blocks[RBW], newGS, blocks[RBW].getNumBytes()); + blocks[RBW].setGenerationStamp(newGS); + + try { + dataSet.recoverAppend(blocks[RWR], blocks[RWR].getGenerationStamp()+1, + blocks[RBW].getNumBytes()); + Assert.fail("Should not have appended to an RWR replica" + blocks[RWR]); + } catch (ReplicaNotFoundException e) { + Assert.assertTrue(e.getMessage().startsWith( + ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA)); + } + + try { + dataSet.recoverAppend(blocks[RUR], blocks[RUR].getGenerationStamp()+1, + blocks[RUR].getNumBytes()); + Assert.fail("Should not have appended to an RUR replica" + blocks[RUR]); + } catch (ReplicaNotFoundException e) { + Assert.assertTrue(e.getMessage().startsWith( + ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA)); + } + + try { + dataSet.recoverAppend(blocks[NON_EXISTENT], + blocks[NON_EXISTENT].getGenerationStamp(), + blocks[NON_EXISTENT].getNumBytes()); + Assert.fail("Should not have appended to a non-existent replica " + + blocks[NON_EXISTENT]); + } catch (ReplicaNotFoundException e) { + Assert.assertTrue(e.getMessage().startsWith( + ReplicaNotFoundException.NON_EXISTENT_REPLICA)); + } } + private void testClose(FSDataset dataSet) throws IOException { + long newGS = blocks[FINALIZED].getGenerationStamp()+1; + dataSet.recoverClose(blocks[FINALIZED], newGS, + blocks[FINALIZED].getNumBytes()); // successful + blocks[FINALIZED].setGenerationStamp(newGS); + + try { + dataSet.recoverClose(blocks[TEMPORARY], blocks[TEMPORARY].getGenerationStamp()+1, + blocks[TEMPORARY].getNumBytes()); + Assert.fail("Should not have recovered close a temporary replica " + + blocks[TEMPORARY]); + } catch (ReplicaNotFoundException e) { + Assert.assertTrue(e.getMessage().startsWith( + ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA)); + } + + newGS = blocks[RBW].getGenerationStamp()+1; + dataSet.recoverClose(blocks[RBW], newGS, blocks[RBW].getNumBytes()); + blocks[RBW].setGenerationStamp(newGS); + + try { + dataSet.recoverClose(blocks[RWR], blocks[RWR].getGenerationStamp()+1, + blocks[RBW].getNumBytes()); + Assert.fail("Should not have recovered close an RWR replica" + blocks[RWR]); + } catch (ReplicaNotFoundException e) { + Assert.assertTrue(e.getMessage().startsWith( + ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA)); + } + + try { + dataSet.recoverClose(blocks[RUR], blocks[RUR].getGenerationStamp()+1, + blocks[RUR].getNumBytes()); + Assert.fail("Should not have recovered close an RUR replica" + blocks[RUR]); + } catch (ReplicaNotFoundException e) { + Assert.assertTrue(e.getMessage().startsWith( + ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA)); + } + + try { + dataSet.recoverClose(blocks[NON_EXISTENT], + blocks[NON_EXISTENT].getGenerationStamp(), + blocks[NON_EXISTENT].getNumBytes()); + Assert.fail("Should not have recovered close a non-existent replica " + + blocks[NON_EXISTENT]); + } catch (ReplicaNotFoundException e) { + Assert.assertTrue(e.getMessage().startsWith( + ReplicaNotFoundException.NON_EXISTENT_REPLICA)); + } + } + private void testWriteToRbw(FSDataset dataSet) throws IOException { try { dataSet.recoverRbw(blocks[FINALIZED],