Return-Path: Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: (qmail 29630 invoked from network); 4 Feb 2010 22:20:35 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 4 Feb 2010 22:20:35 -0000 Received: (qmail 55560 invoked by uid 500); 4 Feb 2010 22:20:35 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 55451 invoked by uid 500); 4 Feb 2010 22:20:35 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 55426 invoked by uid 99); 4 Feb 2010 22:20:35 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Feb 2010 22:20:35 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Feb 2010 22:20:25 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id E879D23888E7; Thu, 4 Feb 2010 22:20:03 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r906690 [1/4] - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/server/common/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/aop/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/... Date: Thu, 04 Feb 2010 22:20:03 -0000 To: hdfs-commits@hadoop.apache.org From: tomwhite@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100204222003.E879D23888E7@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tomwhite Date: Thu Feb 4 22:20:02 2010 New Revision: 906690 URL: http://svn.apache.org/viewvc?rev=906690&view=rev Log: HDFS-914. Refactor DFSOutputStream and DFSInputStream out of DFSClient. Contributed by Todd Lipcon. Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Modified: hadoop/hdfs/trunk/CHANGES.txt hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/HFlushAspects.aj hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeDeath.java hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestPipelines.java hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestAccessTokenWithDFS.java Modified: hadoop/hdfs/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=906690&r1=906689&r2=906690&view=diff ============================================================================== --- hadoop/hdfs/trunk/CHANGES.txt (original) +++ hadoop/hdfs/trunk/CHANGES.txt Thu Feb 4 22:20:02 2010 @@ -54,6 +54,9 @@ HDFS-844. Log the filename when file locking fails. (tomwhite) + HDFS-914. Refactor DFSOutputStream and DFSInputStream out of DFSClient. + (Todd Lipcon via tomwhite) + OPTIMIZATIONS BUG FIXES Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java?rev=906690&view=auto ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java (added) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java Thu Feb 4 22:20:02 2010 @@ -0,0 +1,457 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.CHECKSUM_OK; +import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN; +import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.ByteBuffer; + +import org.apache.hadoop.fs.FSInputChecker; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.DataTransferProtocol; +import org.apache.hadoop.hdfs.security.BlockAccessToken; +import org.apache.hadoop.hdfs.security.InvalidAccessTokenException; +import org.apache.hadoop.hdfs.server.common.HdfsConstants; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.util.DataChecksum; + +/** This is a wrapper around connection to datadone + * and understands checksum, offset etc + */ +public class BlockReader extends FSInputChecker { + + Socket dnSock; //for now just sending checksumOk. + private DataInputStream in; + private DataChecksum checksum; + + /** offset in block of the last chunk received */ + private long lastChunkOffset = -1; + private long lastChunkLen = -1; + private long lastSeqNo = -1; + + /** offset in block where reader wants to actually read */ + private long startOffset; + + /** offset in block of of first chunk - may be less than startOffset + if startOffset is not chunk-aligned */ + private final long firstChunkOffset; + + private int bytesPerChecksum; + private int checksumSize; + + /** + * The total number of bytes we need to transfer from the DN. + * This is the amount that the user has requested plus some padding + * at the beginning so that the read can begin on a chunk boundary. + */ + private final long bytesNeededToFinish; + + private boolean gotEOS = false; + + byte[] skipBuf = null; + ByteBuffer checksumBytes = null; + int dataLeft = 0; + + /* FSInputChecker interface */ + + /* same interface as inputStream java.io.InputStream#read() + * used by DFSInputStream#read() + * This violates one rule when there is a checksum error: + * "Read should not modify user buffer before successful read" + * because it first reads the data to user buffer and then checks + * the checksum. + */ + @Override + public synchronized int read(byte[] buf, int off, int len) + throws IOException { + + // This has to be set here, *before* the skip, since we can + // hit EOS during the skip, in the case that our entire read + // is smaller than the checksum chunk. + boolean eosBefore = gotEOS; + + //for the first read, skip the extra bytes at the front. + if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) { + // Skip these bytes. But don't call this.skip()! + int toSkip = (int)(startOffset - firstChunkOffset); + if ( skipBuf == null ) { + skipBuf = new byte[bytesPerChecksum]; + } + if ( super.read(skipBuf, 0, toSkip) != toSkip ) { + // should never happen + throw new IOException("Could not skip required number of bytes"); + } + } + + int nRead = super.read(buf, off, len); + + // if gotEOS was set in the previous read and checksum is enabled : + if (gotEOS && !eosBefore && nRead >= 0 && needChecksum()) { + //checksum is verified and there are no errors. + checksumOk(dnSock); + } + return nRead; + } + + @Override + public synchronized long skip(long n) throws IOException { + /* How can we make sure we don't throw a ChecksumException, at least + * in majority of the cases?. This one throws. */ + if ( skipBuf == null ) { + skipBuf = new byte[bytesPerChecksum]; + } + + long nSkipped = 0; + while ( nSkipped < n ) { + int toSkip = (int)Math.min(n-nSkipped, skipBuf.length); + int ret = read(skipBuf, 0, toSkip); + if ( ret <= 0 ) { + return nSkipped; + } + nSkipped += ret; + } + return nSkipped; + } + + @Override + public int read() throws IOException { + throw new IOException("read() is not expected to be invoked. " + + "Use read(buf, off, len) instead."); + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + /* Checksum errors are handled outside the BlockReader. + * DFSInputStream does not always call 'seekToNewSource'. In the + * case of pread(), it just tries a different replica without seeking. + */ + return false; + } + + @Override + public void seek(long pos) throws IOException { + throw new IOException("Seek() is not supported in BlockInputChecker"); + } + + @Override + protected long getChunkPosition(long pos) { + throw new RuntimeException("getChunkPosition() is not supported, " + + "since seek is not required"); + } + + /** + * Makes sure that checksumBytes has enough capacity + * and limit is set to the number of checksum bytes needed + * to be read. + */ + private void adjustChecksumBytes(int dataLen) { + int requiredSize = + ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize; + if (checksumBytes == null || requiredSize > checksumBytes.capacity()) { + checksumBytes = ByteBuffer.wrap(new byte[requiredSize]); + } else { + checksumBytes.clear(); + } + checksumBytes.limit(requiredSize); + } + + @Override + protected synchronized int readChunk(long pos, byte[] buf, int offset, + int len, byte[] checksumBuf) + throws IOException { + // Read one chunk. + if ( gotEOS ) { + // Already hit EOF + return -1; + } + + // Read one DATA_CHUNK. + long chunkOffset = lastChunkOffset; + if ( lastChunkLen > 0 ) { + chunkOffset += lastChunkLen; + } + + // pos is relative to the start of the first chunk of the read. + // chunkOffset is relative to the start of the block. + // This makes sure that the read passed from FSInputChecker is the + // for the same chunk we expect to be reading from the DN. + if ( (pos + firstChunkOffset) != chunkOffset ) { + throw new IOException("Mismatch in pos : " + pos + " + " + + firstChunkOffset + " != " + chunkOffset); + } + + // Read next packet if the previous packet has been read completely. + if (dataLeft <= 0) { + //Read packet headers. + int packetLen = in.readInt(); + long offsetInBlock = in.readLong(); + long seqno = in.readLong(); + boolean lastPacketInBlock = in.readBoolean(); + + if (LOG.isDebugEnabled()) { + LOG.debug("DFSClient readChunk got seqno " + seqno + + " offsetInBlock " + offsetInBlock + + " lastPacketInBlock " + lastPacketInBlock + + " packetLen " + packetLen); + } + + int dataLen = in.readInt(); + + // Sanity check the lengths + if ( ( dataLen <= 0 && !lastPacketInBlock ) || + ( dataLen != 0 && lastPacketInBlock) || + (seqno != (lastSeqNo + 1)) ) { + throw new IOException("BlockReader: error in packet header" + + "(chunkOffset : " + chunkOffset + + ", dataLen : " + dataLen + + ", seqno : " + seqno + + " (last: " + lastSeqNo + "))"); + } + + lastSeqNo = seqno; + dataLeft = dataLen; + adjustChecksumBytes(dataLen); + if (dataLen > 0) { + IOUtils.readFully(in, checksumBytes.array(), 0, + checksumBytes.limit()); + } + } + + // Sanity checks + assert len >= bytesPerChecksum; + assert checksum != null; + assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0); + + + int checksumsToRead, bytesToRead; + + if (checksumSize > 0) { + + // How many chunks left in our stream - this is a ceiling + // since we may have a partial chunk at the end of the file + int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1; + + // How many chunks we can fit in databuffer + // - note this is a floor since we always read full chunks + int chunksCanFit = Math.min(len / bytesPerChecksum, + checksumBuf.length / checksumSize); + + // How many chunks should we read + checksumsToRead = Math.min(chunksLeft, chunksCanFit); + // How many bytes should we actually read + bytesToRead = Math.min( + checksumsToRead * bytesPerChecksum, // full chunks + dataLeft); // in case we have a partial + } else { + // no checksum + bytesToRead = Math.min(dataLeft, len); + checksumsToRead = 0; + } + + if ( bytesToRead > 0 ) { + // Assert we have enough space + assert bytesToRead <= len; + assert checksumBytes.remaining() >= checksumSize * checksumsToRead; + assert checksumBuf.length >= checksumSize * checksumsToRead; + IOUtils.readFully(in, buf, offset, bytesToRead); + checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead); + } + + dataLeft -= bytesToRead; + assert dataLeft >= 0; + + lastChunkOffset = chunkOffset; + lastChunkLen = bytesToRead; + + // If there's no data left in the current packet after satisfying + // this read, and we have satisfied the client read, we expect + // an empty packet header from the DN to signify this. + // Note that pos + bytesToRead may in fact be greater since the + // DN finishes off the entire last chunk. + if (dataLeft == 0 && + pos + bytesToRead >= bytesNeededToFinish) { + + // Read header + int packetLen = in.readInt(); + long offsetInBlock = in.readLong(); + long seqno = in.readLong(); + boolean lastPacketInBlock = in.readBoolean(); + int dataLen = in.readInt(); + + if (!lastPacketInBlock || + dataLen != 0) { + throw new IOException("Expected empty end-of-read packet! Header: " + + "(packetLen : " + packetLen + + ", offsetInBlock : " + offsetInBlock + + ", seqno : " + seqno + + ", lastInBlock : " + lastPacketInBlock + + ", dataLen : " + dataLen); + } + + gotEOS = true; + } + + if ( bytesToRead == 0 ) { + return -1; + } + + return bytesToRead; + } + + private BlockReader( String file, long blockId, DataInputStream in, + DataChecksum checksum, boolean verifyChecksum, + long startOffset, long firstChunkOffset, + long bytesToRead, + Socket dnSock ) { + super(new Path("/blk_" + blockId + ":of:" + file)/*too non path-like?*/, + 1, verifyChecksum, + checksum.getChecksumSize() > 0? checksum : null, + checksum.getBytesPerChecksum(), + checksum.getChecksumSize()); + + this.dnSock = dnSock; + this.in = in; + this.checksum = checksum; + this.startOffset = Math.max( startOffset, 0 ); + + // The total number of bytes that we need to transfer from the DN is + // the amount that the user wants (bytesToRead), plus the padding at + // the beginning in order to chunk-align. Note that the DN may elect + // to send more than this amount if the read ends mid-chunk. + this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset); + + this.firstChunkOffset = firstChunkOffset; + lastChunkOffset = firstChunkOffset; + lastChunkLen = -1; + + bytesPerChecksum = this.checksum.getBytesPerChecksum(); + checksumSize = this.checksum.getChecksumSize(); + } + + public static BlockReader newBlockReader(Socket sock, String file, long blockId, BlockAccessToken accessToken, + long genStamp, long startOffset, long len, int bufferSize) throws IOException { + return newBlockReader(sock, file, blockId, accessToken, genStamp, startOffset, len, bufferSize, + true); + } + + /** Java Doc required */ + public static BlockReader newBlockReader( Socket sock, String file, long blockId, + BlockAccessToken accessToken, + long genStamp, + long startOffset, long len, + int bufferSize, boolean verifyChecksum) + throws IOException { + return newBlockReader(sock, file, blockId, accessToken, genStamp, startOffset, + len, bufferSize, verifyChecksum, ""); + } + + public static BlockReader newBlockReader( Socket sock, String file, + long blockId, + BlockAccessToken accessToken, + long genStamp, + long startOffset, long len, + int bufferSize, boolean verifyChecksum, + String clientName) + throws IOException { + // in and out will be closed when sock is closed (by the caller) + DataTransferProtocol.Sender.opReadBlock( + new DataOutputStream(new BufferedOutputStream( + NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT))), + blockId, genStamp, startOffset, len, clientName, accessToken); + + // + // Get bytes in block, set streams + // + + DataInputStream in = new DataInputStream( + new BufferedInputStream(NetUtils.getInputStream(sock), + bufferSize)); + + DataTransferProtocol.Status status = DataTransferProtocol.Status.read(in); + if (status != SUCCESS) { + if (status == ERROR_ACCESS_TOKEN) { + throw new InvalidAccessTokenException( + "Got access token error for OP_READ_BLOCK, self=" + + sock.getLocalSocketAddress() + ", remote=" + + sock.getRemoteSocketAddress() + ", for file " + file + + ", for block " + blockId + "_" + genStamp); + } else { + throw new IOException("Got error for OP_READ_BLOCK, self=" + + sock.getLocalSocketAddress() + ", remote=" + + sock.getRemoteSocketAddress() + ", for file " + file + + ", for block " + blockId + "_" + genStamp); + } + } + DataChecksum checksum = DataChecksum.newDataChecksum( in ); + //Warning when we get CHECKSUM_NULL? + + // Read the first chunk offset. + long firstChunkOffset = in.readLong(); + + if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || + firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) { + throw new IOException("BlockReader: error in first chunk offset (" + + firstChunkOffset + ") startOffset is " + + startOffset + " for file " + file); + } + + return new BlockReader( file, blockId, in, checksum, verifyChecksum, + startOffset, firstChunkOffset, len, + sock ); + } + + @Override + public synchronized void close() throws IOException { + startOffset = -1; + checksum = null; + // in will be closed when its Socket is closed. + } + + /** kind of like readFully(). Only reads as much as possible. + * And allows use of protected readFully(). + */ + public int readAll(byte[] buf, int offset, int len) throws IOException { + return readFully(this, buf, offset, len); + } + + /* When the reader reaches end of a block and there are no checksum + * errors, we send OP_STATUS_CHECKSUM_OK to datanode to inform that + * checksum was verified and there was no error. + */ + void checksumOk(Socket sock) { + try { + OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT); + CHECKSUM_OK.writeOutputStream(out); + out.flush(); + } catch (IOException e) { + // its ok not to be able to send this. + LOG.debug("Could not write to datanode " + sock.getInetAddress() + + ": " + e.getMessage()); + } + } +} \ No newline at end of file