Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 14152 invoked from network); 3 Mar 2008 21:40:53 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 3 Mar 2008 21:40:53 -0000 Received: (qmail 68512 invoked by uid 500); 3 Mar 2008 21:40:49 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 68488 invoked by uid 500); 3 Mar 2008 21:40:49 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 68479 invoked by uid 99); 3 Mar 2008 21:40:49 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Mar 2008 13:40:49 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Mar 2008 21:40:21 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 271DF1A9832; Mon, 3 Mar 2008 13:40:32 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r633285 - in /hadoop/core/trunk: ./ src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/ Date: Mon, 03 Mar 2008 21:40:23 -0000 To: core-commits@hadoop.apache.org From: rangadi@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080303214032.271DF1A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rangadi Date: Mon Mar 3 13:40:18 2008 New Revision: 633285 URL: http://svn.apache.org/viewvc?rev=633285&view=rev Log: HADOOP-2758. Reduce buffer copies in DataNode when data is read from HDFS, without negatively affecting read throughput. (rangadi) Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Balancer.java hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=633285&r1=633284&r2=633285&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Mon Mar 3 13:40:18 2008 @@ -65,6 +65,9 @@ repetitive calls to get the current time and late checking to see if we want speculation on at all. (omalley) + HADOOP-2758. Reduce buffer copies in DataNode when data is read from + HDFS, without negatively affecting read throughput. (rangadi) + BUG FIXES HADOOP-2195. '-mkdir' behaviour is now closer to Linux shell in case of Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Balancer.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Balancer.java?rev=633285&r1=633284&r2=633285&view=diff ============================================================================== --- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Balancer.java (original) +++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Balancer.java Mon Mar 3 13:40:18 2008 @@ -341,7 +341,7 @@ /* Send a block copy request to the outputstream*/ private void sendRequest(DataOutputStream out) throws IOException { - out.writeShort(FSConstants.DATA_TRANFER_VERSION); + out.writeShort(FSConstants.DATA_TRANSFER_VERSION); out.writeByte(FSConstants.OP_COPY_BLOCK); out.writeLong(block.getBlock().getBlockId()); Text.writeString(out, source.getStorageID()); Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java?rev=633285&r1=633284&r2=633285&view=diff ============================================================================== --- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java (original) +++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java Mon Mar 3 13:40:18 2008 @@ -853,7 +853,7 @@ DataInputStream in = new DataInputStream(dnSock.getInputStream()); // Write the header: - out.writeShort( DataNode.DATA_TRANFER_VERSION ); + out.writeShort( DataNode.DATA_TRANSFER_VERSION ); out.writeByte( DataNode.OP_READ_METADATA ); out.writeLong( blockInfo.block.getBlockId() ); Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=633285&r1=633284&r2=633285&view=diff ============================================================================== --- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original) +++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Mon Mar 3 13:40:18 2008 @@ -638,6 +638,7 @@ private DataChecksum checksum; private long lastChunkOffset = -1; private long lastChunkLen = -1; + private long lastSeqNo = -1; private long startOffset; private long firstChunkOffset; @@ -646,6 +647,9 @@ private boolean gotEOS = false; byte[] skipBuf = null; + ByteBuffer checksumBytes = null; + int dataLeft = 0; + boolean isLastPacket = false; /* FSInputChecker interface */ @@ -722,6 +726,22 @@ "since seek is not required"); } + /** + * Makes sure that checksumBytes has enough capacity + * and limit is set to the number of checksum bytes needed + * to be read. + */ + private void adjustChecksumBytes(int dataLen) { + int requiredSize = + ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize; + if (checksumBytes == null || requiredSize > checksumBytes.capacity()) { + checksumBytes = ByteBuffer.wrap(new byte[requiredSize]); + } else { + checksumBytes.clear(); + } + checksumBytes.limit(requiredSize); + } + @Override protected synchronized int readChunk(long pos, byte[] buf, int offset, int len, byte[] checksumBuf) @@ -748,42 +768,60 @@ firstChunkOffset + " != " + chunkOffset); } - // The chunk is transmitted as one packet. Read packet headers. - int packetLen = in.readInt(); - long offsetInBlock = in.readLong(); - long seqno = in.readLong(); - boolean lastPacketInBlock = in.readBoolean(); - LOG.debug("DFSClient readChunk got seqno " + seqno + - " offsetInBlock " + offsetInBlock + - " lastPacketInBlock " + lastPacketInBlock + - " packetLen " + packetLen); - - int chunkLen = in.readInt(); - - // Sanity check the lengths - if ( chunkLen < 0 || chunkLen > bytesPerChecksum || - ( lastChunkLen >= 0 && // prev packet exists - ( (chunkLen > 0 && lastChunkLen != bytesPerChecksum) || - chunkOffset != (lastChunkOffset + lastChunkLen) ) ) ) { - throw new IOException("BlockReader: error in chunk's offset " + - "or length (" + chunkOffset + ":" + - chunkLen + ")"); + // Read next packet if the previous packet has been read completely. + if (dataLeft <= 0) { + //Read packet headers. + int packetLen = in.readInt(); + long offsetInBlock = in.readLong(); + long seqno = in.readLong(); + boolean lastPacketInBlock = in.readBoolean(); + + if (LOG.isDebugEnabled()) { + LOG.debug("DFSClient readChunk got seqno " + seqno + + " offsetInBlock " + offsetInBlock + + " lastPacketInBlock " + lastPacketInBlock + + " packetLen " + packetLen); + } + + int dataLen = in.readInt(); + + // Sanity check the lengths + if ( dataLen < 0 || + ( (dataLen % bytesPerChecksum) != 0 && !lastPacketInBlock ) || + (seqno != (lastSeqNo + 1)) ) { + throw new IOException("BlockReader: error in packet header" + + "(chunkOffset : " + chunkOffset + + ", dataLen : " + dataLen + + ", seqno : " + seqno + + " (last: " + lastSeqNo + "))"); + } + + lastSeqNo = seqno; + isLastPacket = lastPacketInBlock; + dataLeft = dataLen; + adjustChecksumBytes(dataLen); + if (dataLen > 0) { + IOUtils.readFully(in, checksumBytes.array(), 0, + checksumBytes.limit()); + } } + int chunkLen = Math.min(dataLeft, bytesPerChecksum); + if ( chunkLen > 0 ) { // len should be >= chunkLen IOUtils.readFully(in, buf, offset, chunkLen); + checksumBytes.get(checksumBuf, 0, checksumSize); } - if ( checksumSize > 0 ) { - IOUtils.readFully(in, checksumBuf, 0, checksumSize); - } - + dataLeft -= chunkLen; lastChunkOffset = chunkOffset; lastChunkLen = chunkLen; - if ( chunkLen == 0 ) { + if ((dataLeft == 0 && isLastPacket) || chunkLen == 0) { gotEOS = true; + } + if ( chunkLen == 0 ) { return -1; } @@ -827,7 +865,7 @@ new BufferedOutputStream(sock.getOutputStream())); //write the header. - out.writeShort( DATA_TRANFER_VERSION ); + out.writeShort( DATA_TRANSFER_VERSION ); out.write( OP_READ_BLOCK ); out.writeLong( blockId ); out.writeLong( startOffset ); @@ -2030,7 +2068,7 @@ DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream(), buffersize)); blockReplyStream = new DataInputStream(s.getInputStream()); - out.writeShort( DATA_TRANFER_VERSION ); + out.writeShort( DATA_TRANSFER_VERSION ); out.write( OP_WRITE_BLOCK ); out.writeLong( block.getBlockId() ); out.writeInt( nodes.length ); @@ -2147,8 +2185,8 @@ } currentPacket.writeInt(len); - currentPacket.write(b, offset, len); currentPacket.write(checksum, 0, cklen); + currentPacket.write(b, offset, len); currentPacket.numChunks++; bytesCurBlock += len; Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=633285&r1=633284&r2=633285&view=diff ============================================================================== --- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original) +++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Mon Mar 3 13:40:18 2008 @@ -37,6 +37,7 @@ import java.io.*; import java.net.*; +import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.Semaphore; import java.security.NoSuchAlgorithmException; @@ -450,16 +451,6 @@ } } - private void enumerateThreadGroup(ThreadGroup tg) { - int count = tg.activeCount(); - Thread[] info = new Thread[count]; - int num = tg.enumerate(info); - for (int i = 0; i < num; i++) { - System.out.print(info[i].getName() + " "); - } - System.out.println(""); - } - /** * Shut down this instance of the datanode. * Returns only after shutdown is complete. @@ -937,7 +928,7 @@ in = new DataInputStream( new BufferedInputStream(s.getInputStream(), BUFFER_SIZE)); short version = in.readShort(); - if ( version != DATA_TRANFER_VERSION ) { + if ( version != DATA_TRANSFER_VERSION ) { throw new IOException( "Version Mismatch" ); } boolean local = s.getInetAddress().equals(s.getLocalAddress()); @@ -1003,7 +994,7 @@ // send the block DataOutputStream out = new DataOutputStream( - new BufferedOutputStream(s.getOutputStream(), BUFFER_SIZE)); + new BufferedOutputStream(s.getOutputStream(), SMALL_BUFFER_SIZE)); BlockSender blockSender = null; try { try { @@ -1116,7 +1107,7 @@ mirrorIn = new DataInputStream(mirrorSock.getInputStream()); // Write header: Copied from DFSClient.java! - mirrorOut.writeShort( DATA_TRANFER_VERSION ); + mirrorOut.writeShort( DATA_TRANSFER_VERSION ); mirrorOut.write( OP_WRITE_BLOCK ); mirrorOut.writeLong( block.getBlockId() ); mirrorOut.writeInt( pipelineSize ); @@ -1269,11 +1260,11 @@ targetSock.setSoTimeout(socketTimeout); targetOut = new DataOutputStream(new BufferedOutputStream( - targetSock.getOutputStream(), BUFFER_SIZE)); + targetSock.getOutputStream(), SMALL_BUFFER_SIZE)); /* send request to the target */ // fist write header info - targetOut.writeShort(DATA_TRANFER_VERSION); // transfer version + targetOut.writeShort(DATA_TRANSFER_VERSION); // transfer version targetOut.writeByte(OP_REPLACE_BLOCK); // op code targetOut.writeLong(block.getBlockId()); // block id Text.writeString( targetOut, source); // del hint @@ -1445,15 +1436,94 @@ } } + /* ******************************************************************** + Protocol when a client reads data from Datanode (Cur Ver: 9): + + Client's Request : + ================= + + Processed in DataXceiver: + +----------------------------------------------+ + | Common Header | 1 byte OP == OP_READ_BLOCK | + +----------------------------------------------+ + + Processed in readBlock() : + +-------------------------------------------------------+ + | 8 byte Block ID | 8 byte start offset | 8 byte length | + +-------------------------------------------------------+ + + Client sends optional response only at the end of receiving data. + + DataNode Response : + =================== + + In readBlock() : + If there is an error while initializing BlockSender : + +---------------------------+ + | 2 byte OP_STATUS_ERROR | and connection will be closed. + +---------------------------+ + Otherwise + +---------------------------+ + | 2 byte OP_STATUS_SUCCESS | + +---------------------------+ + + Actual data, sent by BlockSender.sendBlock() : + + ChecksumHeader : + +--------------------------------------------------+ + | 1 byte CHECKSUM_TYPE | 4 byte BYTES_PER_CHECKSUM | + +--------------------------------------------------+ + Followed by actual data in the form of PACKETS: + +------------------------------------+ + | Sequence of data PACKETs .... | + +------------------------------------+ + + A "PACKET" is defined further below. + + The client reads data until it receives a packet with + "LastPacketInBlock" set to true or with a zero length. If there is + no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK: + + Client optional response at the end of data transmission : + +------------------------------+ + | 2 byte OP_STATUS_CHECKSUM_OK | + +------------------------------+ + + PACKET : Contains a packet header, checksum and data. Amount of data + ======== carried is set by BUFFER_SIZE. + + +-----------------------------------------------------+ + | 4 byte packet length (excluding packet header) | + +-----------------------------------------------------+ + | 8 byte offset in the block | 8 byte sequence number | + +-----------------------------------------------------+ + | 1 byte isLastPacketInBlock | + +-----------------------------------------------------+ + | 4 byte Length of actual data | + +-----------------------------------------------------+ + | x byte checksum data. x is defined below | + +-----------------------------------------------------+ + | actual data ...... | + +-----------------------------------------------------+ + + x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM * + CHECKSUM_SIZE + + CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32) + + The above packet format is used while writing data to DFS also. + Not all the fields might be used while reading. + + ************************************************************************ */ + class BlockSender implements java.io.Closeable { private Block block; // the block to read from - private DataInputStream blockIn; // data strean + private InputStream blockIn; // data stream private DataInputStream checksumIn; // checksum datastream private DataChecksum checksum; // checksum stream private long offset; // starting position to read private long endOffset; // ending position private long blockLength; - private byte buf[]; // buffer to store data read from the block file & crc private int bytesPerChecksum; // chunk size private int checksumSize; // checksum size private boolean corruptChecksumOk; // if need to verify checksum @@ -1463,8 +1533,14 @@ private boolean blockReadFully; //set when the whole block is read private boolean verifyChecksum; //if true, check is verified while reading private Throttler throttler; - private DataOutputStream out; - + private OutputStream out; + + static final int PKT_HEADER_LEN = ( 4 + /* PacketLen */ + 8 + /* offset in block */ + 8 + /* seqno */ + 1 + /* isLastPacketInBlock */ + 4 /* data len */ ); + BlockSender(Block block, long startOffset, long length, boolean corruptChecksumOk, boolean chunkOffsetOK, boolean verifyChecksum) throws IOException { @@ -1511,7 +1587,7 @@ throw new IOException(msg); } - buf = new byte[bytesPerChecksum + checksumSize]; + offset = (startOffset - (startOffset % bytesPerChecksum)); if (length >= 0) { // Make sure endOffset points to end of a checksumed chunk. @@ -1535,8 +1611,7 @@ } seqno = 0; - InputStream blockInStream = data.getBlockInputStream(block, offset); // seek to offset - blockIn = new DataInputStream(new BufferedInputStream(blockInStream, BUFFER_SIZE)); + blockIn = data.getBlockInputStream(block, offset); // seek to offset } catch (IOException ioe) { IOUtils.closeStream(this); IOUtils.closeStream(blockIn); @@ -1571,26 +1646,37 @@ } } - - private int sendChunk() - throws IOException { - int len = (int) Math.min(endOffset - offset, bytesPerChecksum); + /** + * Sends upto maxChunks chunks of data. + */ + private int sendChunks(ByteBuffer pkt, int maxChunks) throws IOException { + // Sends multiple chunks in one packet with a single write(). + + int len = Math.min((int) (endOffset - offset), + bytesPerChecksum*maxChunks); if (len == 0) { return 0; } - blockIn.readFully(buf, 0, len); + int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum; + int packetLen = len + numChunks*checksumSize + 4; + pkt.clear(); + + // write packet header + pkt.putInt(packetLen); + pkt.putLong(offset); + pkt.putLong(seqno); + pkt.put((byte)((offset + len >= endOffset) ? 1 : 0)); + //why no ByteBuf.putBoolean()? + pkt.putInt(len); + + int checksumOff = pkt.position(); + int checksumLen = numChunks * checksumSize; + byte[] buf = pkt.array(); + if (checksumSize > 0 && checksumIn != null) { try { - checksumIn.readFully(buf, len, checksumSize); - - if (verifyChecksum) { - checksum.reset(); - checksum.update(buf, 0, len); - if (!checksum.compare(buf, len)) { - throw new ChecksumException("Checksum failed at " + offset, len); - } - } + checksumIn.readFully(buf, checksumOff, checksumLen); } catch (IOException e) { LOG.warn(" Could not read or failed to veirfy checksum for data" + " at offset " + offset + " for block " + block + " got : " @@ -1599,28 +1685,39 @@ checksumIn = null; if (corruptChecksumOk) { // Just fill the array with zeros. - Arrays.fill(buf, len, len + checksumSize, (byte) 0); + Arrays.fill(buf, checksumOff, checksumLen, (byte) 0); } else { throw e; } } } - boolean lastPacketInBlock = false; - if (offset + len >= endOffset) { - lastPacketInBlock = true; + + int dataOff = checksumOff + checksumLen; + IOUtils.readFully(blockIn, buf, dataOff, len); + + if (verifyChecksum) { + int dOff = dataOff; + int cOff = checksumOff; + int dLeft = len; + + for (int i=0; i offset) { - // Write one data chunk per loop. - long len = sendChunk(); + long len = sendChunks(pktBuf, maxChunksPerPacket); offset += len; - totalRead += len + checksumSize; + totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum* + checksumSize); seqno++; } - out.writeInt(0); // mark the end of block + out.writeInt(0); // mark the end of block out.flush(); } finally { close(); @@ -1965,6 +2068,7 @@ private int bytesPerChecksum; private int checksumSize; private byte buf[]; + private byte checksumBuf[]; private long offsetInBlock; final private String inAddr; private String mirrorAddr; @@ -1995,6 +2099,7 @@ this.bytesPerChecksum = checksum.getBytesPerChecksum(); this.checksumSize = checksum.getChecksumSize(); this.buf = new byte[bytesPerChecksum + checksumSize]; + this.checksumBuf = new byte[checksumSize]; // // Open local disk out // @@ -2055,7 +2160,8 @@ } /* receive a chunk: write it to disk & mirror it to another stream */ - private void receiveChunk( int len ) throws IOException { + private void receiveChunk( int len, byte[] checksumBuf, int checksumOff ) + throws IOException { if (len <= 0 || len > bytesPerChecksum) { throw new IOException("Got wrong length during writeBlock(" + block + ") from " + inAddr + " at offset " + offsetInBlock + ": " + len @@ -2071,7 +2177,7 @@ lastLen = curLen; curLen = len; - in.readFully(buf, 0, len + checksumSize); + in.readFully(buf, 0, len); /* * Verification is not included in the initial design. For now, it at @@ -2080,7 +2186,7 @@ */ checksum.update(buf, 0, len); - if (!checksum.compare(buf, len)) { + if (!checksum.compare(checksumBuf, checksumOff)) { throw new IOException("Unexpected checksum mismatch " + "while writing " + block + " from " + inAddr); } @@ -2097,7 +2203,8 @@ if (mirrorOut != null) { try { mirrorOut.writeInt(len); - mirrorOut.write(buf, 0, len + checksumSize); + mirrorOut.write(checksumBuf, checksumOff, checksumSize); + mirrorOut.write(buf, 0, len); } catch (IOException ioe) { LOG.info(dnRegistration + ":Exception writing block " + block + " to mirror " + mirrorAddr + "\n" + @@ -2123,7 +2230,7 @@ if (!finalized) { out.write(buf, 0, len); // Write checksum - checksumOut.write(buf, len, checksumSize); + checksumOut.write(checksumBuf, checksumOff, checksumSize); myMetrics.bytesWritten.inc(len); } } catch (IOException iex) { @@ -2145,7 +2252,15 @@ * Receive and process a packet. It contains many chunks. */ private void receivePacket(int packetSize) throws IOException { - + /* TEMP: Currently this handles both interleaved + * and non-interleaved DATA_CHUNKs in side the packet. + * non-interleaved is required for HADOOP-2758 and in future. + * iterleaved will be removed once extra buffer copies are removed + * in write path (HADOOP-1702). + * + * Format of Non-interleaved data packets is described in the + * comment before BlockSender. + */ offsetInBlock = in.readLong(); // get offset of packet in block long seqno = in.readLong(); // get seqno boolean lastPacketInBlock = in.readBoolean(); @@ -2157,9 +2272,6 @@ " lastPacketInBlock " + lastPacketInBlock); setBlockPosition(offsetInBlock); - int len = in.readInt(); - curPacketSize += 4; // read an integer in previous line - // send packet header to next datanode in pipeline if (mirrorOut != null) { try { @@ -2189,6 +2301,9 @@ } } + int len = in.readInt(); + curPacketSize += 4; // read an integer in previous line + if (len == 0) { LOG.info("Receiving empty packet for block " + block); if (mirrorOut != null) { @@ -2198,15 +2313,37 @@ } while (len != 0) { - LOG.debug("Receiving one chunk for block " + block + - " of size " + len); - receiveChunk( len ); - curPacketSize += (len + checksumSize); - if (curPacketSize > packetSize) { - throw new IOException("Packet size for block " + block + - " too long " + curPacketSize + - " was expecting " + packetSize); - } + int checksumOff = 0; + if (len > 0) { + int checksumLen = (len + bytesPerChecksum - 1)/bytesPerChecksum* + checksumSize; + if (checksumBuf.length < checksumLen) { + checksumBuf = new byte[checksumLen]; + } + // read the checksum + in.readFully(checksumBuf, 0, checksumLen); + } + + while (len != 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Receiving one chunk for block " + block + + " of size " + len); + } + + int toRecv = Math.min(len, bytesPerChecksum); + + receiveChunk(toRecv, checksumBuf, checksumOff); + + len -= toRecv; + checksumOff += checksumSize; + curPacketSize += (toRecv + checksumSize); + if (curPacketSize > packetSize) { + throw new IOException("Packet size for block " + block + + " too long " + curPacketSize + + " was expecting " + packetSize); + } + } + if (curPacketSize == packetSize) { if (mirrorOut != null) { mirrorOut.flush(); @@ -2388,13 +2525,14 @@ sock.setSoTimeout(targets.length * socketTimeout); out = new DataOutputStream(new BufferedOutputStream( - sock.getOutputStream(), BUFFER_SIZE)); + sock.getOutputStream(), SMALL_BUFFER_SIZE)); + blockSender = new BlockSender(b, 0, -1, false, false, false); // // Header info // - out.writeShort(DATA_TRANFER_VERSION); + out.writeShort(DATA_TRANSFER_VERSION); out.writeByte(OP_WRITE_BLOCK); out.writeLong(b.getBlockId()); out.writeInt(0); // no pipelining Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?rev=633285&r1=633284&r2=633285&view=diff ============================================================================== --- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original) +++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Mon Mar 3 13:40:18 2008 @@ -100,21 +100,13 @@ * This should change when serialization of DatanodeInfo, not just * when protocol changes. It is not very obvious. */ - /* Version 7: - * Add two operations to data node - * OP_COPY_BLOCK: - * The command is for sending to a proxy source for the balancing purpose - * The datanode then sends OP_REPLACE_BLOCK request to the destination - * OP_COPY_BLOCK BlockID(long) SourceID (UTF8) Destination (DatanodeInfo) - * return OP_STATUS_ERROR if any error occurs; OP_STATUS_SUCCESS otherwise - * OP_REPLACE_BLOCK: - * the command is for sending to a destination for the balancing purpose - * The datanode then writes the block to disk and notifies namenode of this - * received block together with a deletion hint: sourceID - * OP_REPLACE_BLOCK BlockID(long) SourceID(UTF8) Block_Data_With_Crc - * return OP_STATUS_ERROR if any error occurs; OP_STATUS_SUCCESS otherwise + /* + * Version 9: + * While reading data from Datanode, each PACKET can consist + * of non-interleaved data (check for for larger amount of data, + * followed by data). */ - public static final int DATA_TRANFER_VERSION = 8; + public static final int DATA_TRANSFER_VERSION = 9; // Return codes for file create public static final int OPERATION_FAILED = 0; @@ -140,6 +132,8 @@ public static int MAX_PATH_DEPTH = 1000; public static final int BUFFER_SIZE = new Configuration().getInt("io.file.buffer.size", 4096); + //Used for writing header etc. + static final int SMALL_BUFFER_SIZE = Math.min(BUFFER_SIZE/2, 512); //TODO mb@media-style.com: should be conf injected? public static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024; public static final int DEFAULT_DATA_SOCKET_SIZE = 128 * 1024; Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java?rev=633285&r1=633284&r2=633285&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java Mon Mar 3 13:40:18 2008 @@ -215,7 +215,7 @@ sock.setSoTimeout(FSConstants.READ_TIMEOUT); // sendRequest DataOutputStream out = new DataOutputStream(sock.getOutputStream()); - out.writeShort(FSConstants.DATA_TRANFER_VERSION); + out.writeShort(FSConstants.DATA_TRANSFER_VERSION); out.writeByte(FSConstants.OP_COPY_BLOCK); out.writeLong(block.getBlockId()); Text.writeString(out, source.getStorageID()); Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java?rev=633285&r1=633284&r2=633285&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java Mon Mar 3 13:40:18 2008 @@ -152,19 +152,19 @@ sendBuf.reset(); // bad version - recvOut.writeShort((short)(FSConstants.DATA_TRANFER_VERSION-1)); - sendOut.writeShort((short)(FSConstants.DATA_TRANFER_VERSION-1)); + recvOut.writeShort((short)(FSConstants.DATA_TRANSFER_VERSION-1)); + sendOut.writeShort((short)(FSConstants.DATA_TRANSFER_VERSION-1)); sendRecvData("Wrong Version", true); // bad ops sendBuf.reset(); - sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION); + sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION); sendOut.writeByte((byte)(FSConstants.OP_WRITE_BLOCK-1)); sendRecvData("Wrong Op Code", true); /* Test OP_WRITE_BLOCK */ sendBuf.reset(); - sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION); + sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION); sendOut.writeByte((byte)FSConstants.OP_WRITE_BLOCK); sendOut.writeLong(newBlockId); // block id sendOut.writeInt(0); // targets in pipeline @@ -181,7 +181,7 @@ sendBuf.reset(); recvBuf.reset(); - sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION); + sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION); sendOut.writeByte((byte)FSConstants.OP_WRITE_BLOCK); sendOut.writeLong(newBlockId); sendOut.writeInt(0); // targets in pipeline @@ -195,7 +195,7 @@ sendBuf.reset(); recvBuf.reset(); - sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION); + sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION); sendOut.writeByte((byte)FSConstants.OP_WRITE_BLOCK); sendOut.writeLong(++newBlockId); sendOut.writeInt(0); // targets in pipeline @@ -220,7 +220,7 @@ // test for writing a valid zero size block sendBuf.reset(); recvBuf.reset(); - sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION); + sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION); sendOut.writeByte((byte)FSConstants.OP_WRITE_BLOCK); sendOut.writeLong(++newBlockId); sendOut.writeInt(0); // targets in pipeline @@ -247,7 +247,7 @@ // bad block id sendBuf.reset(); recvBuf.reset(); - sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION); + sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION); sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK); newBlockId = firstBlock.getBlockId()-1; sendOut.writeLong(newBlockId); @@ -258,7 +258,7 @@ // negative block start offset sendBuf.reset(); - sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION); + sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION); sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK); sendOut.writeLong(firstBlock.getBlockId()); sendOut.writeLong(-1L); @@ -268,7 +268,7 @@ // bad block start offset sendBuf.reset(); - sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION); + sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION); sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK); sendOut.writeLong(firstBlock.getBlockId()); sendOut.writeLong(fileLen); @@ -280,7 +280,7 @@ recvBuf.reset(); recvOut.writeShort((short)FSConstants.OP_STATUS_SUCCESS); sendBuf.reset(); - sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION); + sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION); sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK); sendOut.writeLong(firstBlock.getBlockId()); sendOut.writeLong(0); @@ -292,7 +292,7 @@ recvBuf.reset(); recvOut.writeShort((short)FSConstants.OP_STATUS_ERROR); sendBuf.reset(); - sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION); + sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION); sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK); sendOut.writeLong(firstBlock.getBlockId()); sendOut.writeLong(0); @@ -302,7 +302,7 @@ //At the end of all this, read the file to make sure that succeeds finally. sendBuf.reset(); - sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION); + sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION); sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK); sendOut.writeLong(firstBlock.getBlockId()); sendOut.writeLong(0);