Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 86586 invoked from network); 14 May 2008 06:33:06 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 14 May 2008 06:33:06 -0000 Received: (qmail 33775 invoked by uid 500); 14 May 2008 06:33:07 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 33729 invoked by uid 500); 14 May 2008 06:33:07 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 33718 invoked by uid 99); 14 May 2008 06:33:07 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 May 2008 23:33:07 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 May 2008 06:32:28 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id DB9BE23889C4; Tue, 13 May 2008 23:32:43 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r656118 - in /hadoop/core/trunk: ./ src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/ Date: Wed, 14 May 2008 06:32:43 -0000 To: core-commits@hadoop.apache.org From: rangadi@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080514063243.DB9BE23889C4@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rangadi Date: Tue May 13 23:32:42 2008 New Revision: 656118 URL: http://svn.apache.org/viewvc?rev=656118&view=rev Log: HADOOP-1702. Reduce buffer copies when data is written to DFS. DataNodes take 30% less CPU while writing data. (rangadi) Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=656118&r1=656117&r2=656118&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Tue May 13 23:32:42 2008 @@ -140,6 +140,9 @@ HADOOP-3369. Fast block processing during name-node startup. (shv) + HADOOP-1702. Reduce buffer copies when data is written to DFS. + DataNodes take 30% less CPU while writing data. (rangadi) + BUG FIXES HADOOP-2905. 'fsck -move' triggers NPE in NameNode. Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=656118&r1=656117&r2=656118&view=diff ============================================================================== --- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original) +++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Tue May 13 23:32:42 2008 @@ -39,6 +39,7 @@ import java.util.zip.CRC32; import java.util.concurrent.TimeUnit; import java.util.concurrent.ConcurrentHashMap; +import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import javax.net.SocketFactory; @@ -71,6 +72,7 @@ private short defaultReplication; private SocketFactory socketFactory; private int socketTimeout; + final int writePacketSize; private FileSystem.Statistics stats; /** @@ -144,6 +146,8 @@ this.socketTimeout = conf.getInt("dfs.socket.timeout", FSConstants.READ_TIMEOUT); this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class); + // dfs.write.packet.size is an internal config variable + this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024); try { this.ugi = UnixUserGroupInformation.login(conf, true); @@ -1683,7 +1687,6 @@ private DataInputStream blockReplyStream; private Block block; private long blockSize; - private int buffersize; private DataChecksum checksum; private LinkedList dataQueue = new LinkedList(); private LinkedList ackQueue = new LinkedList(); @@ -1694,10 +1697,8 @@ private ResponseProcessor response = null; private long currentSeqno = 0; private long bytesCurBlock = 0; // bytes writen in current block - private int packetSize = 0; + private int packetSize = 0; // write packet size, including the header. private int chunksPerPacket = 0; - private int chunksPerBlock = 0; - private int chunkSize = 0; private DatanodeInfo[] nodes = null; // list of targets for current block private volatile boolean hasError = false; private volatile int errorIndex = 0; @@ -1707,56 +1708,95 @@ private boolean persistBlocks = false; // persist blocks on namenode private class Packet { - ByteBuffer buffer; + ByteBuffer buffer; // only one of buf and buffer is non-null + byte[] buf; long seqno; // sequencenumber of buffer in block long offsetInBlock; // offset in block boolean lastPacketInBlock; // is this the last packet in block? int numChunks; // number of chunks currently in packet - int flushOffsetBuffer; // last full chunk that was flushed - long flushOffsetBlock; // block offset of last full chunk flushed + int dataStart; + int dataPos; + int checksumStart; + int checksumPos; // create a new packet - Packet(int size, long offsetInBlock) { - buffer = ByteBuffer.allocate(size); - buffer.clear(); + Packet(int pktSize, int chunksPerPkt, long offsetInBlock) { this.lastPacketInBlock = false; this.numChunks = 0; this.offsetInBlock = offsetInBlock; this.seqno = currentSeqno; - this.flushOffsetBuffer = 0; - this.flushOffsetBlock = 0; currentSeqno++; + + buffer = null; + buf = new byte[pktSize]; + + checksumStart = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER; + checksumPos = checksumStart; + dataStart = checksumStart + chunksPerPkt * checksum.getChecksumSize(); + dataPos = dataStart; } - // create a new Packet with the contents copied from the - // specified one. Shares the same buffer. - Packet(Packet old) { - this.buffer = old.buffer; - this.lastPacketInBlock = old.lastPacketInBlock; - this.numChunks = old.numChunks; - this.offsetInBlock = old.offsetInBlock; - this.seqno = old.seqno; - this.flushOffsetBuffer = old.flushOffsetBuffer; - this.flushOffsetBlock = old.flushOffsetBlock; - } - - // writes len bytes from offset off in inarray into - // this packet. - // - void write(byte[] inarray, int off, int len) { - buffer.put(inarray, off, len); + void writeData(byte[] inarray, int off, int len) { + if ( dataPos + len > buf.length) { + throw new BufferOverflowException(); + } + System.arraycopy(inarray, off, buf, dataPos, len); + dataPos += len; } - // writes an integer into this packet. - // - void writeInt(int value) { - buffer.putInt(value); - } - - // sets the last flush offset of this packet. - void setFlushOffset(int bufoff, long blockOff) { - this.flushOffsetBuffer = bufoff;; - this.flushOffsetBlock = blockOff; + void writeChecksum(byte[] inarray, int off, int len) { + if (checksumPos + len > dataStart) { + throw new BufferOverflowException(); + } + System.arraycopy(inarray, off, buf, checksumPos, len); + checksumPos += len; + } + + /** + * Returns ByteBuffer that contains one full packet, including header. + */ + ByteBuffer getBuffer() { + /* Once this is called, no more data can be added to the packet. + * setting 'buf' to null ensures that. + * This is called only when the packet is ready to be sent. + */ + if (buffer != null) { + return buffer; + } + + //prepare the header and close any gap between checksum and data. + + int dataLen = dataPos - dataStart; + int checksumLen = checksumPos - checksumStart; + + if (checksumPos != dataStart) { + /* move the checksum to cover the gap. + * This can happen for the last packet. + */ + System.arraycopy(buf, checksumStart, buf, + dataStart - checksumLen , checksumLen); + } + + int pktLen = SIZE_OF_INTEGER + dataLen + checksumLen; + + //normally dataStart == checksumPos, i.e., offset is zero. + buffer = ByteBuffer.wrap(buf, dataStart - checksumPos, + DataNode.PKT_HEADER_LEN + pktLen); + buf = null; + buffer.mark(); + + /* write the header and data length. + * The format is described in comment before DataNode.BlockSender + */ + buffer.putInt(pktLen); // pktSize + buffer.putLong(offsetInBlock); + buffer.putLong(seqno); + buffer.put((byte) ((lastPacketInBlock) ? 1 : 0)); + //end of pkt header + buffer.putInt(dataLen); // actual data length, excluding checksum. + + buffer.reset(); + return buffer; } } @@ -1807,8 +1847,6 @@ try { // get packet to be sent. one = dataQueue.getFirst(); - int start = 0; - int len = one.buffer.limit(); long offsetInBlock = one.offsetInBlock; // get new block from namenode. @@ -1821,16 +1859,6 @@ response.start(); } - // If we are sending a sub-packet, then determine the offset - // in block. - if (one.flushOffsetBuffer != 0) { - offsetInBlock += one.flushOffsetBlock; - len = len - one.flushOffsetBuffer; - start += one.flushOffsetBuffer; - } - - // user bytes from 'position' to 'limit'. - byte[] arr = one.buffer.array(); if (offsetInBlock >= blockSize) { throw new IOException("BlockSize " + blockSize + " is smaller than data size. " + @@ -1839,6 +1867,8 @@ " Aborting file " + src); } + ByteBuffer buf = one.getBuffer(); + // move packet from dataQueue to ackQueue dataQueue.removeFirst(); dataQueue.notifyAll(); @@ -1846,22 +1876,21 @@ ackQueue.addLast(one); ackQueue.notifyAll(); } - + // write out data to remote datanode - blockStream.writeInt(len); // size of this packet - blockStream.writeLong(offsetInBlock); // data offset in block - blockStream.writeLong(one.seqno); // sequence num of packet - blockStream.writeBoolean(one.lastPacketInBlock); - blockStream.write(arr, start, len); + blockStream.write(buf.array(), buf.position(), buf.remaining()); + if (one.lastPacketInBlock) { blockStream.writeInt(0); // indicate end-of-block } blockStream.flush(); - LOG.debug("DataStreamer block " + block + - " wrote packet seqno:" + one.seqno + - " size:" + len + - " offsetInBlock:" + offsetInBlock + - " lastPacketInBlock:" + one.lastPacketInBlock); + if (LOG.isDebugEnabled()) { + LOG.debug("DataStreamer block " + block + + " wrote packet seqno:" + one.seqno + + " size:" + buf.remaining() + + " offsetInBlock:" + one.offsetInBlock + + " lastPacketInBlock:" + one.lastPacketInBlock); + } } catch (IOException e) { LOG.warn("DataStreamer Exception: " + e); hasError = true; @@ -2138,7 +2167,6 @@ super(new CRC32(), conf.getInt("io.bytes.per.checksum", 512), 4); this.src = src; this.blockSize = blockSize; - this.buffersize = buffersize; this.progress = progress; if (progress != null) { LOG.debug("Set non-null progress callback on DFSOutputStream "+src); @@ -2154,11 +2182,11 @@ } checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, bytesPerChecksum); - // A maximum of 128 chunks per packet, i.e. 64K packet size. - chunkSize = bytesPerChecksum + 2 * SIZE_OF_INTEGER; // user data & checksum - chunksPerBlock = (int)(blockSize / bytesPerChecksum); - chunksPerPacket = Math.min(chunksPerBlock, 128); - packetSize = chunkSize * chunksPerPacket; + int chunkSize = bytesPerChecksum + checksum.getChecksumSize(); + chunksPerPacket = Math.max((writePacketSize - DataNode.PKT_HEADER_LEN - + SIZE_OF_INTEGER + chunkSize-1)/chunkSize, 1); + packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER + + chunkSize * chunksPerPacket; try { namenode.create( @@ -2254,7 +2282,7 @@ // DataOutputStream out = new DataOutputStream( new BufferedOutputStream(NetUtils.getOutputStream(s, writeTimeout), - buffersize)); + DataNode.SMALL_BUFFER_SIZE)); blockReplyStream = new DataInputStream(NetUtils.getInputStream(s)); out.writeShort( DATA_TRANSFER_VERSION ); @@ -2351,12 +2379,6 @@ this.checksum.getChecksumSize() + " but found to be " + checksum.length); } - if (len + cklen + SIZE_OF_INTEGER > chunkSize) { - throw new IOException("writeChunk() found data of size " + - (len + cklen + 4) + - " that cannot be larger than chukSize " + - chunkSize); - } synchronized (dataQueue) { @@ -2370,30 +2392,30 @@ isClosed(); if (currentPacket == null) { - currentPacket = new Packet(packetSize, bytesCurBlock); + currentPacket = new Packet(packetSize, chunksPerPacket, + bytesCurBlock); LOG.debug("DFSClient writeChunk allocating new packet " + currentPacket.seqno); } - currentPacket.writeInt(len); - currentPacket.write(checksum, 0, cklen); - currentPacket.write(b, offset, len); + currentPacket.writeChecksum(checksum, 0, cklen); + currentPacket.writeData(b, offset, len); currentPacket.numChunks++; bytesCurBlock += len; // If packet is full, enqueue it for transmission // if (currentPacket.numChunks == chunksPerPacket || - bytesCurBlock == chunksPerBlock * bytesPerChecksum) { + bytesCurBlock == blockSize) { LOG.debug("DFSClient writeChunk packet full seqno " + currentPacket.seqno); - currentPacket.buffer.flip(); // // if we allocated a new packet because we encountered a block // boundary, reset bytesCurBlock. // - if (bytesCurBlock == chunksPerBlock * bytesPerChecksum) { + if (bytesCurBlock == blockSize) { currentPacket.lastPacketInBlock = true; bytesCurBlock = 0; + lastFlushOffset = -1; } dataQueue.addLast(currentPacket); dataQueue.notifyAll(); @@ -2410,66 +2432,38 @@ * datanode. Block allocations are persisted on namenode. */ public synchronized void fsync() throws IOException { - Packet savePacket = null; - int position = 0; - long saveOffset = 0; - try { - // Record the state of the current output stream. - // This state will be reverted after the flush successfully - // finishes. It is necessary to do this so that partial - // checksum chunks are reused by writes that follow this - // flush. - if (currentPacket != null) { - savePacket = new Packet(currentPacket); - position = savePacket.buffer.position(); - } - saveOffset = bytesCurBlock; + /* Record current blockOffset. This might be changed inside + * flushBuffer() where a partial checksum chunk might be flushed. + * After the flush, reset the bytesCurBlock back to its previous value, + * any partial checksum chunk will be sent now and in next packet. + */ + long saveOffset = bytesCurBlock; // flush checksum buffer, but keep checksum buffer intact flushBuffer(true); - LOG.debug("DFSClient flushInternal save position " + - position + - " cur position " + - ((currentPacket != null) ? currentPacket.buffer.position() : -1) + - " limit " + - ((currentPacket != null) ? currentPacket.buffer.limit() : -1) + - " bytesCurBlock " + bytesCurBlock + - " lastFlushOffset " + lastFlushOffset); - - // - // Detect the condition that we have already flushed all - // outstanding data. - // - boolean skipFlush = (lastFlushOffset == bytesCurBlock && - savePacket != null && currentPacket != null && - savePacket.seqno == currentPacket.seqno); + LOG.debug("DFSClient flush() : saveOffset " + saveOffset + + " bytesCurBlock " + bytesCurBlock + + " lastFlushOffset " + lastFlushOffset); - // Do the flush. - // - if (!skipFlush) { + // Flush only if we haven't already flushed till this offset. + if (lastFlushOffset != bytesCurBlock) { // record the valid offset of this flush lastFlushOffset = bytesCurBlock; // wait for all packets to be sent and acknowledged flushInternal(); + } else { + // just discard the current packet since it is already been sent. + currentPacket = null; } // Restore state of stream. Record the last flush offset // of the last full chunk that was flushed. // bytesCurBlock = saveOffset; - currentPacket = null; - if (savePacket != null) { - savePacket.buffer.limit(savePacket.buffer.capacity()); - savePacket.buffer.position(position); - savePacket.setFlushOffset(position, - savePacket.numChunks * - checksum.getBytesPerChecksum()); - currentPacket = savePacket; - } // If any new blocks were allocated since the last flush, // then persist block locations on namenode. @@ -2501,7 +2495,6 @@ // If there is data in the current buffer, send it across // if (currentPacket != null) { - currentPacket.buffer.flip(); dataQueue.addLast(currentPacket); dataQueue.notifyAll(); currentPacket = null; @@ -2594,12 +2587,11 @@ // packet with empty payload. synchronized (dataQueue) { if (currentPacket == null && bytesCurBlock != 0) { - currentPacket = new Packet(packetSize, bytesCurBlock); - currentPacket.writeInt(0); // one chunk with empty contents + currentPacket = new Packet(packetSize, chunksPerPacket, + bytesCurBlock); } if (currentPacket != null) { currentPacket.lastPacketInBlock = true; - currentPacket.setFlushOffset(0, 0); // send whole packet } } @@ -2649,7 +2641,9 @@ synchronized void setChunksPerPacket(int value) { chunksPerPacket = Math.min(chunksPerPacket, value); - packetSize = chunkSize * chunksPerPacket; + packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER + + (checksum.getBytesPerChecksum() + + checksum.getChecksumSize()) * chunksPerPacket; } synchronized void setTestFilename(String newname) { Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=656118&r1=656117&r2=656118&view=diff ============================================================================== --- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original) +++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Tue May 13 23:32:42 2008 @@ -129,6 +129,7 @@ private int socketTimeout; private int socketWriteTimeout = 0; private boolean transferToAllowed = true; + private int writePacketSize = 0; DataBlockScanner blockScanner; Daemon blockScannerThread; @@ -221,6 +222,7 @@ * to false on some of them. */ this.transferToAllowed = conf.getBoolean("dfs.datanode.transferTo.allowed", true); + this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024); String address = NetUtils.getServerAddress(conf, "dfs.datanode.bindAddress", @@ -991,7 +993,8 @@ DataInputStream in=null; try { in = new DataInputStream( - new BufferedInputStream(NetUtils.getInputStream(s), BUFFER_SIZE)); + new BufferedInputStream(NetUtils.getInputStream(s), + SMALL_BUFFER_SIZE)); short version = in.readShort(); if ( version != DATA_TRANSFER_VERSION ) { throw new IOException( "Version Mismatch" ); @@ -1174,7 +1177,7 @@ mirrorOut = new DataOutputStream( new BufferedOutputStream( NetUtils.getOutputStream(mirrorSock, writeTimeout), - BUFFER_SIZE)); + SMALL_BUFFER_SIZE)); mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock)); // Write header: Copied from DFSClient.java! @@ -1603,6 +1606,12 @@ ************************************************************************ */ + /** Header size for a packet */ + static final int PKT_HEADER_LEN = ( 4 + /* Packet payload length */ + 8 + /* offset in block */ + 8 + /* seqno */ + 1 /* isLastPacketInBlock */); + class BlockSender implements java.io.Closeable { private Block block; // the block to read from private InputStream blockIn; // data stream @@ -1622,12 +1631,6 @@ private boolean verifyChecksum; //if true, check is verified while reading private Throttler throttler; - static final int PKT_HEADER_LEN = ( 4 + /* PacketLen */ - 8 + /* offset in block */ - 8 + /* seqno */ - 1 + /* isLastPacketInBlock */ - 4 /* data len */ ); - BlockSender(Block block, long startOffset, long length, boolean corruptChecksumOk, boolean chunkOffsetOK, boolean verifyChecksum) throws IOException { @@ -1873,7 +1876,7 @@ out.flush(); int maxChunksPerPacket; - int pktSize; + int pktSize = PKT_HEADER_LEN + SIZE_OF_INTEGER; if (transferToAllowed && !verifyChecksum && baseStream instanceof SocketOutputStream && @@ -1891,12 +1894,11 @@ + bytesPerChecksum - 1)/bytesPerChecksum; // allocate smaller buffer while using transferTo(). - pktSize = PKT_HEADER_LEN + checksumSize * maxChunksPerPacket; + pktSize += checksumSize * maxChunksPerPacket; } else { maxChunksPerPacket = Math.max(1, (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum); - pktSize = PKT_HEADER_LEN + - (bytesPerChecksum + checksumSize) * maxChunksPerPacket; + pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket; } ByteBuffer pktBuf = ByteBuffer.allocate(pktSize); @@ -2200,39 +2202,6 @@ } } - // this class is a bufferoutputstream that exposes the number of - // bytes in the buffer. - static private class DFSBufferedOutputStream extends BufferedOutputStream { - OutputStream out; - DFSBufferedOutputStream(OutputStream out, int capacity) { - super(out, capacity); - this.out = out; - } - - public synchronized void flush() throws IOException { - super.flush(); - } - - /** - * Returns true if the channel pointer is already set at the - * specified offset. Otherwise returns false. - */ - synchronized boolean samePosition(FSDatasetInterface data, - FSDataset.BlockWriteStreams streams, - Block block, - long offset) - throws IOException { - if (data.getChannelPosition(block, streams) + count == offset) { - return true; - } - LOG.debug("samePosition is false. " + - " current position " + data.getChannelPosition(block, streams)+ - " buffered size " + count + - " new offset " + offset); - return false; - } - } - /* A class that receives a block and wites to its own disk, meanwhile * may copies it to another site. If a throttler is provided, * streaming throttling is also supported. @@ -2242,13 +2211,13 @@ private boolean finalized; private DataInputStream in = null; // from where data are read private DataChecksum checksum; // from where chunks of a block can be read - private DataOutputStream out = null; // to block file at local disk + private OutputStream out = null; // to block file at local disk private DataOutputStream checksumOut = null; // to crc file at local disk - private DFSBufferedOutputStream bufStream = null; private int bytesPerChecksum; private int checksumSize; - private byte buf[]; - private byte checksumBuf[]; + private ByteBuffer buf; // contains one full packet. + private int bufRead; //amount of valid data in the buf + private int maxPacketReadLen; private long offsetInBlock; final private String inAddr; private String mirrorAddr; @@ -2272,19 +2241,16 @@ this.checksum = DataChecksum.newDataChecksum(in); this.bytesPerChecksum = checksum.getBytesPerChecksum(); this.checksumSize = checksum.getChecksumSize(); - this.buf = new byte[bytesPerChecksum + checksumSize]; - this.checksumBuf = new byte[checksumSize]; // // Open local disk out // streams = data.writeToBlock(block, isRecovery); this.finalized = data.isValidBlock(block); if (streams != null) { - this.bufStream = new DFSBufferedOutputStream( - streams.dataOut, BUFFER_SIZE); - this.out = new DataOutputStream(bufStream); + this.out = streams.dataOut; this.checksumOut = new DataOutputStream(new BufferedOutputStream( - streams.checksumOut, BUFFER_SIZE)); + streams.checksumOut, + SMALL_BUFFER_SIZE)); } } catch(IOException ioe) { IOUtils.closeStream(this); @@ -2351,174 +2317,249 @@ } } - /* receive a chunk: write it to disk & mirror it to another stream */ - private void receiveChunk( int len, byte[] checksumBuf, int checksumOff ) - throws IOException { - if (len <= 0 || len > bytesPerChecksum) { - throw new IOException("Got wrong length during writeBlock(" + block - + ") from " + inAddr + " at offset " + offsetInBlock + ": " + len - + " expected <= " + bytesPerChecksum); - } + /** + * Verify multiple CRC chunks. + */ + private void verifyChunks( byte[] dataBuf, int dataOff, int len, + byte[] checksumBuf, int checksumOff ) + throws IOException { + while (len > 0) { + int chunkLen = Math.min(len, bytesPerChecksum); + + checksum.update(dataBuf, dataOff, chunkLen); - in.readFully(buf, 0, len); - - /* - * Verification is not included in the initial design. For now, it at - * least catches some bugs. Later, we can include this after showing that - * it does not affect performance much. - */ - checksum.update(buf, 0, len); + if (!checksum.compare(checksumBuf, checksumOff)) { + throw new IOException("Unexpected checksum mismatch " + + "while writing " + block + " from " + inAddr); + } - if (!checksum.compare(checksumBuf, checksumOff)) { - throw new IOException("Unexpected checksum mismatch " - + "while writing " + block + " from " + inAddr); + checksum.reset(); + dataOff += chunkLen; + checksumOff += checksumSize; + len -= chunkLen; } + } - checksum.reset(); - offsetInBlock += len; - - // First write to remote node before writing locally. - if (mirrorOut != null) { - try { - mirrorOut.writeInt(len); - mirrorOut.write(checksumBuf, checksumOff, checksumSize); - mirrorOut.write(buf, 0, len); - } catch (IOException ioe) { - handleMirrorOutError(ioe); + /** + * Makes sure buf.position() is zero without modifying buf.remaining(). + * It moves the data if position needs to be changed. + */ + private void shiftBufData() { + if (bufRead != buf.limit()) { + throw new IllegalStateException("bufRead should be same as " + + "buf.limit()"); + } + + //shift the remaining data on buf to the front + if (buf.position() > 0) { + int dataLeft = buf.remaining(); + if (dataLeft > 0) { + byte[] b = buf.array(); + System.arraycopy(b, buf.position(), b, 0, dataLeft); + } + buf.position(0); + bufRead = dataLeft; + buf.limit(bufRead); + } + } + + /** + * reads upto toRead byte to buf at buf.limit() and increments the limit. + * throws an IOException if read does not succeed. + */ + private int readToBuf(int toRead) throws IOException { + if (toRead < 0) { + toRead = (maxPacketReadLen > 0 ? maxPacketReadLen : buf.capacity()) + - buf.limit(); + } + + int nRead = in.read(buf.array(), buf.limit(), toRead); + + if (nRead < 0) { + throw new EOFException("while trying to read " + toRead + " bytes"); + } + bufRead = buf.limit() + nRead; + buf.limit(bufRead); + return nRead; + } + + + /** + * Reads (at least) one packet and returns the packet length. + * buf.position() points to the start of the packet and + * buf.limit() point to the end of the packet. There could + * be more data from next packet in buf.

+ * + * It tries to read a full packet with single read call. + * Consecutinve packets are usually of the same length. + */ + private int readNextPacket() throws IOException { + /* This dances around buf a little bit, mainly to read + * full packet with single read and to accept arbitarary size + * for next packet at the same time. + */ + if (buf == null) { + /* initialize buffer to the best guess size: + * 'chunksPerPacket' calculation here should match the same + * calculation in DFSClient to make the guess accurate. + */ + int chunkSize = bytesPerChecksum + checksumSize; + int chunksPerPacket = (writePacketSize - PKT_HEADER_LEN - + SIZE_OF_INTEGER + chunkSize - 1)/chunkSize; + buf = ByteBuffer.allocate(PKT_HEADER_LEN + SIZE_OF_INTEGER + + Math.max(chunksPerPacket, 1) * chunkSize); + buf.limit(0); + } + + // See if there is data left in the buffer : + if (bufRead > buf.limit()) { + buf.limit(bufRead); + } + + while (buf.remaining() < SIZE_OF_INTEGER) { + if (buf.position() > 0) { + shiftBufData(); } + readToBuf(-1); } - - try { - if (!finalized) { - out.write(buf, 0, len); - // Write checksum - checksumOut.write(checksumBuf, checksumOff, checksumSize); - myMetrics.bytesWritten.inc(len); + + /* We mostly have the full packet or at least enough for an int + */ + buf.mark(); + int payloadLen = buf.getInt(); + buf.reset(); + + if (payloadLen == 0) { + //end of stream! + buf.limit(buf.position() + SIZE_OF_INTEGER); + return 0; + } + + // check corrupt values for pktLen, 100MB upper limit should be ok? + if (payloadLen < 0 || payloadLen > (100*1024*1024)) { + throw new IOException("Incorrect value for packet payload : " + + payloadLen); + } + + int pktSize = payloadLen + PKT_HEADER_LEN; + + if (buf.remaining() < pktSize) { + //we need to read more data + int toRead = pktSize - buf.remaining(); + + // first make sure buf has enough space. + int spaceLeft = buf.capacity() - buf.limit(); + if (toRead > spaceLeft && buf.position() > 0) { + shiftBufData(); + spaceLeft = buf.capacity() - buf.limit(); + } + if (toRead > spaceLeft) { + byte oldBuf[] = buf.array(); + int toCopy = buf.limit(); + buf = ByteBuffer.allocate(toCopy + toRead); + System.arraycopy(oldBuf, 0, buf.array(), 0, toCopy); + buf.limit(toCopy); + } + + //now read: + while (toRead > 0) { + toRead -= readToBuf(toRead); } - } catch (IOException iex) { - checkDiskError(iex); - throw iex; } - - if (throttler != null) { // throttle I/O - throttler.throttle(len + checksumSize + 4); + + if (buf.remaining() > pktSize) { + buf.limit(buf.position() + pktSize); + } + + if (pktSize > maxPacketReadLen) { + maxPacketReadLen = pktSize; } + + return payloadLen; } - - /* - * Receive and process a packet. It contains many chunks. + + /** + * Receives and processes a packet. It can contain many chunks. + * returns size of the packet. */ - private void receivePacket(int packetSize) throws IOException { - /* TEMP: Currently this handles both interleaved - * and non-interleaved DATA_CHUNKs in side the packet. - * non-interleaved is required for HADOOP-2758 and in future. - * iterleaved will be removed once extra buffer copies are removed - * in write path (HADOOP-1702). - * - * Format of Non-interleaved data packets is described in the - * comment before BlockSender. - */ - offsetInBlock = in.readLong(); // get offset of packet in block - long seqno = in.readLong(); // get seqno - boolean lastPacketInBlock = in.readBoolean(); - int curPacketSize = 0; - LOG.debug("Receiving one packet for block " + block + - " of size " + packetSize + - " seqno " + seqno + - " offsetInBlock " + offsetInBlock + - " lastPacketInBlock " + lastPacketInBlock); + private int receivePacket() throws IOException { + + int payloadLen = readNextPacket(); + + if (payloadLen <= 0) { + return payloadLen; + } + + buf.mark(); + //read the header + buf.getInt(); // packet length + offsetInBlock = buf.getLong(); // get offset of packet in block + long seqno = buf.getLong(); // get seqno + boolean lastPacketInBlock = (buf.get() != 0); + + int endOfHeader = buf.position(); + buf.reset(); + + if (LOG.isDebugEnabled()){ + LOG.debug("Receiving one packet for block " + block + + " of length " + payloadLen + + " seqno " + seqno + + " offsetInBlock " + offsetInBlock + + " lastPacketInBlock " + lastPacketInBlock); + } + setBlockPosition(offsetInBlock); - int len = in.readInt(); - curPacketSize += 4; // read an integer in previous line - - // send packet header to next datanode in pipeline + //First write the packet to the mirror: if (mirrorOut != null) { try { - int mirrorPacketSize = packetSize; - if (len > bytesPerChecksum) { - /* - * This is a packet with non-interleaved checksum. - * But we are sending interleaving checksums to mirror, - * which changes packet len. Adjust the packet size for mirror. - * - * As mentioned above, this is mismatch is - * temporary till HADOOP-1702. - */ - - //find out how many chunks are in this patcket : - int chunksInPkt = (len + bytesPerChecksum - 1)/bytesPerChecksum; - - // we send 4 more bytes for for each of the extra - // checksum chunks. so : - mirrorPacketSize += (chunksInPkt - 1) * 4; - } - mirrorOut.writeInt(mirrorPacketSize); - mirrorOut.writeLong(offsetInBlock); - mirrorOut.writeLong(seqno); - mirrorOut.writeBoolean(lastPacketInBlock); + mirrorOut.write(buf.array(), buf.position(), buf.remaining()); + mirrorOut.flush(); } catch (IOException e) { handleMirrorOutError(e); } } + buf.position(endOfHeader); + int len = buf.getInt(); + + if (len < 0) { + throw new IOException("Got wrong length during writeBlock(" + block + + ") from " + inAddr + " at offset " + + offsetInBlock + ": " + len); + } + if (len == 0) { - LOG.info("Receiving empty packet for block " + block); - if (mirrorOut != null) { - try { - mirrorOut.writeInt(len); - mirrorOut.flush(); - } catch (IOException e) { - handleMirrorOutError(e); - } - } - } + LOG.debug("Receiving empty packet for block " + block); + } else { + offsetInBlock += len; - while (len != 0) { - int checksumOff = 0; - if (len > 0) { - int checksumLen = (len + bytesPerChecksum - 1)/bytesPerChecksum* - checksumSize; - if (checksumBuf.length < checksumLen) { - checksumBuf = new byte[checksumLen]; - } - // read the checksum - in.readFully(checksumBuf, 0, checksumLen); - } - - while (len != 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("Receiving one chunk for block " + block + - " of size " + len); - } - - int toRecv = Math.min(len, bytesPerChecksum); - - curPacketSize += (toRecv + checksumSize); - if (curPacketSize > packetSize) { - throw new IOException("Packet size for block " + block + - " too long " + curPacketSize + - " was expecting " + packetSize); - } - - receiveChunk(toRecv, checksumBuf, checksumOff); - - len -= toRecv; - checksumOff += checksumSize; + int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)* + checksumSize; + + if ( buf.remaining() != (checksumLen + len)) { + throw new IOException("Data remaining in packet does not match " + + "sum of checksumLen and dataLen"); } - - if (curPacketSize == packetSize) { - if (mirrorOut != null) { - try { - mirrorOut.flush(); - } catch (IOException e) { - handleMirrorOutError(e); - } - } - break; + int checksumOff = buf.position(); + int dataOff = checksumOff + checksumLen; + byte pktBuf[] = buf.array(); + + buf.position(buf.limit()); // move to the end of the data. + + verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff); + + try { + if (!finalized) { + //finally write to the disk : + out.write(pktBuf, dataOff, len); + checksumOut.write(pktBuf, checksumOff, checksumLen); + myMetrics.bytesWritten.inc(len); + } + } catch (IOException iex) { + checkDiskError(iex); + throw iex; } - len = in.readInt(); - curPacketSize += 4; } /// flush entire packet before sending ack @@ -2529,6 +2570,12 @@ ((PacketResponder)responder.getRunnable()).enqueue(seqno, lastPacketInBlock); } + + if (throttler != null) { // throttle I/O + throttler.throttle(payloadLen); + } + + return payloadLen; } public void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException { @@ -2562,13 +2609,9 @@ } /* - * Skim packet headers. A response is needed for every packet. + * Receive until packet length is zero. */ - int len = in.readInt(); // get packet size - while (len != 0) { - receivePacket(len); - len = in.readInt(); // get packet size - } + while (receivePacket() > 0) {} // flush the mirror out if (mirrorOut != null) { @@ -2637,8 +2680,9 @@ } return; } - if (bufStream.samePosition(data, streams, block, offsetInBlock)) { - return; + + if (data.getChannelPosition(block, streams) == offsetInBlock) { + return; // nothing to do } if (offsetInBlock % bytesPerChecksum != 0) { throw new IOException("setBlockPosition trying to set position to " + Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?rev=656118&r1=656117&r2=656118&view=diff ============================================================================== --- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original) +++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Tue May 13 23:32:42 2008 @@ -101,12 +101,11 @@ * when protocol changes. It is not very obvious. */ /* - * Version 9: - * While reading data from Datanode, each PACKET can consist - * of non-interleaved data (check for for larger amount of data, - * followed by data). + * Version 10: + * DFSClient also sends non-interleaved checksum and data while writing + * to DFS. */ - public static final int DATA_TRANSFER_VERSION = 9; + public static final int DATA_TRANSFER_VERSION = 10; // Return codes for file create public static final int OPERATION_FAILED = 0; Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java?rev=656118&r1=656117&r2=656118&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java Tue May 13 23:32:42 2008 @@ -204,7 +204,7 @@ sendOut.writeInt(0); sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32); sendOut.writeInt((int)512); - sendOut.writeInt(20); // size of packet + sendOut.writeInt(4); // size of packet sendOut.writeLong(0); // OffsetInBlock sendOut.writeLong(100); // sequencenumber sendOut.writeBoolean(false); // lastPacketInBlock @@ -229,7 +229,7 @@ sendOut.writeInt(0); sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32); sendOut.writeInt((int)512); // checksum size - sendOut.writeInt(20); // size of packet + sendOut.writeInt(8); // size of packet sendOut.writeLong(0); // OffsetInBlock sendOut.writeLong(100); // sequencenumber sendOut.writeBoolean(true); // lastPacketInBlock