Return-Path: Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: (qmail 97572 invoked from network); 1 Jul 2010 08:39:05 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 1 Jul 2010 08:39:05 -0000 Received: (qmail 77365 invoked by uid 500); 1 Jul 2010 08:39:04 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 77106 invoked by uid 500); 1 Jul 2010 08:39:01 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 77099 invoked by uid 99); 1 Jul 2010 08:39:00 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Jul 2010 08:39:00 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Jul 2010 08:38:54 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 0B1AA2388993; Thu, 1 Jul 2010 08:37:31 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r959555 - in /hadoop/common/branches/branch-0.20-append: ./ src/core/org/apache/hadoop/io/ src/core/org/apache/hadoop/util/ src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/hdfs/org/apache/hadoop/hdfs/server/namenode/ src/test/org/apach... Date: Thu, 01 Jul 2010 08:37:30 -0000 To: common-commits@hadoop.apache.org From: dhruba@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100701083731.0B1AA2388993@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: dhruba Date: Thu Jul 1 08:37:30 2010 New Revision: 959555 URL: http://svn.apache.org/viewvc?rev=959555&view=rev Log: HDFS-1057. Concurrent readers hit ChecksumExceptions if following a writer to very end of file (Sam Rash via dhruba) Added: hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/util/ChecksumUtil.java hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileConcurrentReader.java Modified: hadoop/common/branches/branch-0.20-append/CHANGES.txt hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/io/IOUtils.java hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Modified: hadoop/common/branches/branch-0.20-append/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/CHANGES.txt?rev=959555&r1=959554&r2=959555&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.20-append/CHANGES.txt Thu Jul 1 08:37:30 2010 @@ -19,6 +19,9 @@ Release 0.20-append - Unreleased HDFS-457. Better handling of volume failure in DataNode Storage. (Nicolas Spiegelberg via dhruba) + HDFS-1057. Concurrent readers hit ChecksumExceptions if following + a writer to very end of file (Sam Rash via dhruba) + IMPROVEMENTS BUG FIXES Modified: hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/io/IOUtils.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/io/IOUtils.java?rev=959555&r1=959554&r2=959555&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/io/IOUtils.java (original) +++ hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/io/IOUtils.java Thu Jul 1 08:37:30 2010 @@ -20,6 +20,8 @@ package org.apache.hadoop.io; import java.io.*; import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import org.apache.commons.logging.Log; @@ -105,6 +107,28 @@ public class IOUtils { off += ret; } } + + /** Reads len bytes in a loop using the channel of the stream + * @param fileChannel a FileChannel to read len bytes into buf + * @param buf The buffer to fill + * @param off offset from the buffer + * @param len the length of bytes to read + * @throws IOException if it could not read requested number of bytes + * for any reason (including EOF) + */ + public static void readFileChannelFully( FileChannel fileChannel, byte buf[], + int off, int len ) throws IOException { + int toRead = len; + ByteBuffer byteBuffer = ByteBuffer.wrap(buf, off, len); + while ( toRead > 0 ) { + int ret = fileChannel.read(byteBuffer); + if ( ret < 0 ) { + throw new IOException( "Premeture EOF from inputStream"); + } + toRead -= ret; + off += ret; + } + } /** Similar to readFully(). Skips bytes in a loop. * @param in The InputStream to skip bytes from Added: hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/util/ChecksumUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/util/ChecksumUtil.java?rev=959555&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/util/ChecksumUtil.java (added) +++ hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/util/ChecksumUtil.java Thu Jul 1 08:37:30 2010 @@ -0,0 +1,40 @@ +package org.apache.hadoop.util; + +import java.io.IOException; + +public class ChecksumUtil { + /** + * updates the checksum for a buffer + * + * @param buf - buffer to update the checksum in + * @param chunkOff - offset in the buffer where the checksum is to update + * @param dataOff - offset in the buffer of the data + * @param dataLen - length of data to compute checksum on + */ + public static void updateChunkChecksum( + byte[] buf, + int checksumOff, + int dataOff, + int dataLen, + DataChecksum checksum + ) throws IOException { + int bytesPerChecksum = checksum.getBytesPerChecksum(); + int checksumSize = checksum.getChecksumSize(); + int curChecksumOff = checksumOff; + int curDataOff = dataOff; + int numChunks = (dataLen + bytesPerChecksum - 1) / bytesPerChecksum; + int dataLeft = dataLen; + + for (int i = 0; i < numChunks; i++) { + int len = Math.min(dataLeft, bytesPerChecksum); + + checksum.reset(); + checksum.update(buf, curDataOff, len); + checksum.writeValue(buf, curChecksumOff, false); + + curDataOff += len; + curChecksumOff += checksumSize; + dataLeft -= len; + } + } +} Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=959555&r1=959554&r2=959555&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original) +++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Thu Jul 1 08:37:30 2010 @@ -480,7 +480,10 @@ class BlockReceiver implements java.io.C /// flush entire packet before sending ack flush(); - + + // update length only after flush to disk + datanode.data.setVisibleLength(block, offsetInBlock); + // put in queue for pending acks if (responder != null) { ((PacketResponder)responder.getRunnable()).enqueue(seqno, Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=959555&r1=959554&r2=959555&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original) +++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Thu Jul 1 08:37:30 2010 @@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.B import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.SocketOutputStream; +import org.apache.hadoop.util.ChecksumUtil; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.StringUtils; @@ -64,6 +65,7 @@ class BlockSender implements java.io.Clo private boolean verifyChecksum; //if true, check is verified while reading private BlockTransferThrottler throttler; private final String clientTraceFmt; // format of client trace log message + private final MemoizedBlock memoizedBlock; /** * Minimum buffer used while sending data to clients. Used only if @@ -89,7 +91,7 @@ class BlockSender implements java.io.Clo this.chunkOffsetOK = chunkOffsetOK; this.corruptChecksumOk = corruptChecksumOk; this.verifyChecksum = verifyChecksum; - this.blockLength = datanode.data.getLength(block); + this.blockLength = datanode.data.getVisibleLength(block); this.transferToAllowed = datanode.transferToAllowed; this.clientTraceFmt = clientTraceFmt; @@ -164,6 +166,7 @@ class BlockSender implements java.io.Clo seqno = 0; blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset + memoizedBlock = new MemoizedBlock(blockIn, blockLength, datanode.data, block); } catch (IOException ioe) { IOUtils.closeStream(this); IOUtils.closeStream(blockIn); @@ -234,6 +237,15 @@ class BlockSender implements java.io.Clo int len = Math.min((int) (endOffset - offset), bytesPerChecksum*maxChunks); + + // truncate len so that any partial chunks will be sent as a final packet. + // this is not necessary for correctness, but partial chunks are + // ones that may be recomputed and sent via buffer copy, so try to minimize + // those bytes + if (len > bytesPerChecksum && len % bytesPerChecksum != 0) { + len -= len % bytesPerChecksum; + } + if (len == 0) { return 0; } @@ -298,32 +310,54 @@ class BlockSender implements java.io.Clo cOff += checksumSize; } } - //writing is done below (mainly to handle IOException) - } - - try { - if (blockInPosition >= 0) { + + // only recompute checksum if we can't trust the meta data due to + // concurrent writes + if (memoizedBlock.hasBlockChanged(len)) { + ChecksumUtil.updateChunkChecksum( + buf, checksumOff, dataOff, len, checksum + ); + } + + try { + out.write(buf, 0, dataOff + len); + } catch (IOException e) { + throw ioeToSocketException(e); + } + } else { + try { //use transferTo(). Checks on out and blockIn are already done. + SocketOutputStream sockOut = (SocketOutputStream) out; + FileChannel fileChannel = ((FileInputStream) blockIn).getChannel(); - SocketOutputStream sockOut = (SocketOutputStream)out; - //first write the packet - sockOut.write(buf, 0, dataOff); - // no need to flush. since we know out is not a buffered stream. - - sockOut.transferToFully(((FileInputStream)blockIn).getChannel(), - blockInPosition, len); + if (memoizedBlock.hasBlockChanged(len)) { + fileChannel.position(blockInPosition); + IOUtils.readFileChannelFully( + fileChannel, + buf, + dataOff, + len + ); + + ChecksumUtil.updateChunkChecksum( + buf, checksumOff, dataOff, len, checksum + ); + sockOut.write(buf, 0, dataOff + len); + } else { + //first write the packet + sockOut.write(buf, 0, dataOff); + // no need to flush. since we know out is not a buffered stream. + sockOut.transferToFully(fileChannel, blockInPosition, len); + } blockInPosition += len; - } else { - // normal transfer - out.write(buf, 0, dataOff + len); - } - - } catch (IOException e) { + + } catch (IOException e) { /* exception while writing to the client (well, with transferTo(), * it could also be while reading from the local file). */ - throw ioeToSocketException(e); + throw ioeToSocketException(e); + } } if (throttler != null) { // rebalancing so throttle @@ -382,12 +416,13 @@ class BlockSender implements java.io.Clo streamForSendChunks = baseStream; // assure a mininum buffer size. - maxChunksPerPacket = (Math.max(BUFFER_SIZE, + maxChunksPerPacket = (Math.max(BUFFER_SIZE, MIN_BUFFER_WITH_TRANSFERTO) + bytesPerChecksum - 1)/bytesPerChecksum; - // allocate smaller buffer while using transferTo(). - pktSize += checksumSize * maxChunksPerPacket; + // packet buffer has to be able to do a normal transfer in the case + // of recomputing checksum + pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket; } else { maxChunksPerPacket = Math.max(1, (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum); @@ -410,7 +445,13 @@ class BlockSender implements java.io.Clo } catch (IOException e) { //socket error throw ioeToSocketException(e); } - } finally { + } + catch (RuntimeException e) { + LOG.error("unexpected exception sending block", e); + + throw new IOException("unexpected runtime exception", e); + } + finally { if (clientTraceFmt != null) { ClientTraceLog.info(String.format(clientTraceFmt, totalRead)); } @@ -425,4 +466,49 @@ class BlockSender implements java.io.Clo boolean isBlockReadFully() { return blockReadFully; } + + /** + * helper class used to track if a block's meta data is verifiable or not + */ + private class MemoizedBlock { + // block data stream + private InputStream inputStream; + // visible block length + private long blockLength; + private final FSDatasetInterface fsDataset; + private final Block block; + + private MemoizedBlock( + InputStream inputStream, + long blockLength, + FSDatasetInterface fsDataset, Block block) { + this.inputStream = inputStream; + this.blockLength = blockLength; + this.fsDataset = fsDataset; + this.block = block; + } + + // logic: if we are starting or ending on a partial chunk and the block + // has more data than we were told at construction, the block has 'changed' + // in a way that we care about (ie, we can't trust crc data) + boolean hasBlockChanged(long dataLen) throws IOException { + // check if we are using transferTo since we tell if the file has changed + // (blockInPosition >= 0 => we are using transferTo and File Channels + if (BlockSender.this.blockInPosition >= 0) { + long currentLength = ((FileInputStream) inputStream).getChannel().size(); + + return (blockInPosition % bytesPerChecksum != 0 || + dataLen % bytesPerChecksum != 0) && + currentLength > blockLength; + + } else { + long currentLength = fsDataset.getLength(block); + + // offset is the offset into the block + return (BlockSender.this.offset % bytesPerChecksum != 0 || + dataLen % bytesPerChecksum != 0) && + currentLength > blockLength; + } + } + } } Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=959555&r1=959554&r2=959555&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original) +++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Thu Jul 1 08:37:30 2010 @@ -709,14 +709,15 @@ public class FSDataset implements FSCons //Find better place? public static final String METADATA_EXTENSION = ".meta"; public static final short METADATA_VERSION = 1; - + static class ActiveFile { final File file; final List threads = new ArrayList(2); + private volatile long visibleLength; ActiveFile(File f, List list) { - file = f; + this(f); if (list != null) { threads.addAll(list); } @@ -726,13 +727,22 @@ public class FSDataset implements FSCons // no active threads associated with this ActiveFile ActiveFile(File f) { file = f; + visibleLength = f.length(); } - + + public long getVisibleLength() { + return visibleLength; + } + + public void setVisibleLength(long value) { + visibleLength = value; + } + public String toString() { return getClass().getSimpleName() + "(file=" + file + ", threads=" + threads + ")"; } - } + } static String getMetaFileName(String blockFileName, long genStamp) { return blockFileName + "_" + genStamp + METADATA_EXTENSION; @@ -782,7 +792,7 @@ public class FSDataset implements FSCons } /** Return the block file for the given ID */ - public File findBlockFile(long blockId) { + public synchronized File findBlockFile(long blockId) { final Block b = new Block(blockId); File blockfile = null; ActiveFile activefile = ongoingCreates.get(b); @@ -808,7 +818,8 @@ public class FSDataset implements FSCons return null; } File metafile = findMetaFile(blockfile); - return new Block(blkid, blockfile.length(), + Block block = new Block(blkid); + return new Block(blkid, getVisibleLength(block), parseGenerationStamp(blockfile, metafile)); } @@ -883,6 +894,31 @@ public class FSDataset implements FSCons return getBlockFile(b).length(); } + @Override + public synchronized long getVisibleLength(Block b) throws IOException { + ActiveFile activeFile = ongoingCreates.get(b); + + if (activeFile != null) { + return activeFile.getVisibleLength(); + } else { + return getLength(b); + } + } + + @Override + public synchronized void setVisibleLength(Block b, long length) + throws IOException { + ActiveFile activeFile = ongoingCreates.get(b); + + if (activeFile != null) { + activeFile.setVisibleLength(length); + } else { + throw new IOException( + String.format("block %s is not being written to", b) + ); + } + } + /** * Get File name for a given block. */ Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=959555&r1=959554&r2=959555&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original) +++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Thu Jul 1 08:37:30 2010 @@ -40,7 +40,7 @@ import org.apache.hadoop.util.DiskChecke * */ public interface FSDatasetInterface extends FSDatasetMBean { - + /** * Returns the length of the metadata file of the specified block @@ -49,7 +49,7 @@ public interface FSDatasetInterface exte * @throws IOException */ public long getMetaDataLength(Block b) throws IOException; - + /** * This class provides the input stream and length of the metadata * of a block @@ -94,6 +94,25 @@ public interface FSDatasetInterface exte public long getLength(Block b) throws IOException; /** + * Returns the specified block's visible length (has metadata for this) + * @param b + * @return the specified block's visible length + * @throws IOException + */ + public long getVisibleLength(Block b) throws IOException; + + /** + * update the specified blocks visible meta data. NOTE: only applies + * to blocks that are being written to. If called on closed blocks, + * throws IOException + * + * @param b block to update the length for + * @param length value to set visible length to + * @throws IOException if the block is not in ongoingCreates + */ + public void setVisibleLength(Block b, long length) throws IOException; + + /** * @return the generation stamp stored with the block. */ public Block getStoredBlock(long blkid) throws IOException; Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=959555&r1=959554&r2=959555&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original) +++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Jul 1 08:37:30 2010 @@ -831,18 +831,28 @@ public class FSNamesystem implements FSC blocks[curBlk] + "blockMap has " + numCorruptNodes + " but corrupt replicas map has " + numCorruptReplicas); } - boolean blockCorrupt = (numCorruptNodes == numNodes); - int numMachineSet = blockCorrupt ? numNodes : + DatanodeDescriptor[] machineSet = null; + boolean blockCorrupt = false; + if (inode.isUnderConstruction() && curBlk == blocks.length - 1 + && blocksMap.numNodes(blocks[curBlk]) == 0) { + // get unfinished block locations + INodeFileUnderConstruction cons = (INodeFileUnderConstruction)inode; + machineSet = cons.getTargets(); + blockCorrupt = false; + } else { + blockCorrupt = (numCorruptNodes == numNodes); + int numMachineSet = blockCorrupt ? numNodes : (numNodes - numCorruptNodes); - DatanodeDescriptor[] machineSet = new DatanodeDescriptor[numMachineSet]; - if (numMachineSet > 0) { - numNodes = 0; - for(Iterator it = - blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) { - DatanodeDescriptor dn = it.next(); - boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn); - if (blockCorrupt || (!blockCorrupt && !replicaCorrupt)) - machineSet[numNodes++] = dn; + machineSet = new DatanodeDescriptor[numMachineSet]; + if (numMachineSet > 0) { + numNodes = 0; + for(Iterator it = + blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) { + DatanodeDescriptor dn = it.next(); + boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn); + if (blockCorrupt || (!blockCorrupt && !replicaCorrupt)) + machineSet[numNodes++] = dn; + } } } results.add(new LocatedBlock(blocks[curBlk], machineSet, curPos, Added: hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileConcurrentReader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileConcurrentReader.java?rev=959555&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileConcurrentReader.java (added) +++ hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileConcurrentReader.java Thu Jul 1 08:37:30 2010 @@ -0,0 +1,402 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.LeaseManager; +import org.apache.hadoop.io.IOUtils; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.junit.Before; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.atomic.*; + + +/** + * This class tests the cases of a concurrent reads/writes to a file; + * ie, one writer and one or more readers can see unfinsihed blocks + */ +public class TestFileConcurrentReader extends junit.framework.TestCase { + + private enum SyncType + { + SYNC, + APPEND, + } + + + private static final Logger LOG = + Logger.getLogger(TestFileConcurrentReader.class); + + { + ((Log4JLogger) LeaseManager.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger) FSNamesystem.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL); + } + + static final long seed = 0xDEADBEEFL; + static final int blockSize = 8192; + private static final int DEFAULT_WRITE_SIZE = 1024 + 1; + private static final int SMALL_WRITE_SIZE = 61; + private Configuration conf; + private MiniDFSCluster cluster; + private FileSystem fileSystem; + + // creates a file but does not close it + private FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl) + throws IOException { + System.out.println("createFile: Created " + name + " with " + repl + " replica."); + FSDataOutputStream stm = fileSys.create(name, true, + fileSys.getConf().getInt("io.file.buffer.size", 4096), + (short) repl, (long) blockSize); + return stm; + } + + @Before + protected void setUp() throws IOException { + conf = new Configuration(); + init(conf); + } + + private void init(Configuration conf) throws IOException { + if (cluster != null) { + cluster.shutdown(); + } + cluster = new MiniDFSCluster(conf, 1, true, null); + cluster.waitClusterUp(); + fileSystem = cluster.getFileSystem(); + } + + private void writeFileAndSync(FSDataOutputStream stm, int size) + throws IOException { +// byte[] buffer = AppendTestUtil.randomBytes(seed, size); + byte[] buffer = generateSequentialBytes(0, size); + stm.write(buffer, 0, size); + stm.sync(); + } + + private void checkCanRead(FileSystem fileSys, Path path, int numBytes) + throws IOException { + waitForBlocks(fileSys, path); + assertBytesAvailable(fileSys, path, numBytes); + } + + // make sure bytes are available and match expected + private void assertBytesAvailable( + FileSystem fileSystem, + Path path, + int numBytes + ) throws IOException { + byte[] buffer = new byte[numBytes]; + FSDataInputStream inputStream = fileSystem.open(path); + IOUtils.readFully(inputStream, buffer, 0, numBytes); + + assertTrue( + "unable to validate bytes", + validateSequentialBytes(buffer, 0, numBytes) + ); + } + + private void waitForBlocks(FileSystem fileSys, Path name) + throws IOException { + // wait until we have at least one block in the file to read. + boolean done = false; + + while (!done) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + done = true; + BlockLocation[] locations = fileSys.getFileBlockLocations( + fileSys.getFileStatus(name), 0, blockSize); + if (locations.length < 1) { + done = false; + continue; + } + } + } + + /** + * Test that that writes to an incomplete block are available to a reader + */ + public void testUnfinishedBlockRead() + throws IOException { + try { + // check that / exists + Path path = new Path("/"); + System.out.println("Path : \"" + path.toString() + "\""); + System.out.println(fileSystem.getFileStatus(path).isDir()); + assertTrue("/ should be a directory", + fileSystem.getFileStatus(path).isDir()); + + // create a new file in the root, write data, do no close + Path file1 = new Path("/unfinished-block"); + FSDataOutputStream stm = TestFileCreation.createFile(fileSystem, file1, 1); + + // write partial block and sync + int partialBlockSize = blockSize / 2; + writeFileAndSync(stm, partialBlockSize); + + // Make sure a client can read it before it is closed + checkCanRead(fileSystem, file1, partialBlockSize); + + stm.close(); + } finally { + cluster.shutdown(); + } + } + + /** + * test case: if the BlockSender decides there is only one packet to send, + * the previous computation of the pktSize based on transferToAllowed + * would result in too small a buffer to do the buffer-copy needed + * for partial chunks. + */ + public void testUnfinishedBlockPacketBufferOverrun() throws IOException { + try { + // check that / exists + Path path = new Path("/"); + System.out.println("Path : \"" + path.toString() + "\""); + System.out.println(fileSystem.getFileStatus(path).isDir()); + assertTrue("/ should be a directory", + fileSystem.getFileStatus(path).isDir()); + + // create a new file in the root, write data, do no close + Path file1 = new Path("/unfinished-block"); + final FSDataOutputStream stm = + TestFileCreation.createFile(fileSystem, file1, 1); + + // write partial block and sync + final int bytesPerChecksum = conf.getInt("io.bytes.per.checksum", 512); + final int partialBlockSize = bytesPerChecksum - 1; + + writeFileAndSync(stm, partialBlockSize); + + // Make sure a client can read it before it is closed + checkCanRead(fileSystem, file1, partialBlockSize); + + stm.close(); + } finally { + cluster.shutdown(); + } + } + + // for some reason, using tranferTo evokes the race condition more often + // so test separately + public void testUnfinishedBlockCRCErrorTransferTo() throws IOException { + runTestUnfinishedBlockCRCError(true, SyncType.SYNC, DEFAULT_WRITE_SIZE); + } + + public void testUnfinishedBlockCRCErrorTransferToVerySmallWrite() + throws IOException { + runTestUnfinishedBlockCRCError(true, SyncType.SYNC, SMALL_WRITE_SIZE); + } + + // fails due to issue w/append, disable + public void _testUnfinishedBlockCRCErrorTransferToAppend() throws IOException { + runTestUnfinishedBlockCRCError(true, SyncType.APPEND, DEFAULT_WRITE_SIZE); + } + + public void testUnfinishedBlockCRCErrorNormalTransfer() throws IOException { + runTestUnfinishedBlockCRCError(false, SyncType.SYNC, DEFAULT_WRITE_SIZE); + } + + public void testUnfinishedBlockCRCErrorNormalTransferVerySmallWrite() + throws IOException { + runTestUnfinishedBlockCRCError(false, SyncType.SYNC, SMALL_WRITE_SIZE); + } + + // fails due to issue w/append, disable + public void _testUnfinishedBlockCRCErrorNormalTransferAppend() + throws IOException { + runTestUnfinishedBlockCRCError(false, SyncType.APPEND, DEFAULT_WRITE_SIZE); + } + + private void runTestUnfinishedBlockCRCError( + final boolean transferToAllowed, SyncType syncType, int writeSize + ) throws IOException { + runTestUnfinishedBlockCRCError( + transferToAllowed, syncType, writeSize, new Configuration() + ); + } + + private void runTestUnfinishedBlockCRCError( + final boolean transferToAllowed, + final SyncType syncType, + final int writeSize, + Configuration conf + ) throws IOException { + try { + conf.setBoolean("dfs.support.append", syncType == SyncType.APPEND); + conf.setBoolean("dfs.datanode.transferTo.allowed", transferToAllowed); + init(conf); + + final Path file = new Path("/block-being-written-to"); + final int numWrites = 2000; + final AtomicBoolean writerDone = new AtomicBoolean(false); + final AtomicBoolean writerStarted = new AtomicBoolean(false); + final AtomicBoolean error = new AtomicBoolean(false); + final FSDataOutputStream initialOutputStream = fileSystem.create(file); + Thread writer = new Thread(new Runnable() { + private FSDataOutputStream outputStream = initialOutputStream; + + @Override + public void run() { + try { + for (int i = 0; !error.get() && i < numWrites; i++) { + try { + final byte[] writeBuf = + generateSequentialBytes(i * writeSize, writeSize); + outputStream.write(writeBuf); + if (syncType == SyncType.SYNC) { + outputStream.sync(); + } else { // append + outputStream.close(); + outputStream = fileSystem.append(file); + } + writerStarted.set(true); + } catch (IOException e) { + error.set(true); + LOG.error(String.format("error writing to file")); + } + } + + outputStream.close(); + writerDone.set(true); + } catch (Exception e) { + LOG.error("error in writer", e); + + throw new RuntimeException(e); + } + } + }); + Thread tailer = new Thread(new Runnable() { + @Override + public void run() { + try { + long startPos = 0; + while (!writerDone.get() && !error.get()) { + if (writerStarted.get()) { + try { + startPos = tailFile(file, startPos); + } catch (IOException e) { + LOG.error(String.format("error tailing file %s", file), e); + + throw new RuntimeException(e); + } + } + } + } catch (RuntimeException e) { + if (e.getCause() instanceof ChecksumException) { + error.set(true); + } + + LOG.error("error in tailer", e); + throw e; + } + } + }); + + writer.start(); + tailer.start(); + + try { + writer.join(); + tailer.join(); + + assertFalse( + "error occurred, see log above", error.get() + ); + } catch (InterruptedException e) { + LOG.info("interrupted waiting for writer or tailer to complete"); + + Thread.currentThread().interrupt(); + } + initialOutputStream.close(); + } finally { + cluster.shutdown(); + } + } + + private byte[] generateSequentialBytes(int start, int length) { + byte[] result = new byte[length]; + + for (int i = 0; i < length; i++) { + result[i] = (byte)((start + i) % 127); + } + + return result; + } + + private boolean validateSequentialBytes(byte[] buf, int startPos, int len) { + for (int i = 0; i < len; i++) { + int expected = (i + startPos) % 127; + + if (buf[i] % 127 != expected) { + LOG.error(String.format("at position [%d], got [%d] and expected [%d]", + startPos, buf[i], expected)); + + return false; + } + } + + return true; + } + + private long tailFile(Path file, long startPos) throws IOException { + long numRead = 0; + FSDataInputStream inputStream = fileSystem.open(file); + inputStream.seek(startPos); + + int len = 4 * 1024; + byte[] buf = new byte[len]; + int read; + while ((read = inputStream.read(buf)) > -1) { + LOG.info(String.format("read %d bytes", read)); + + if (!validateSequentialBytes(buf, (int) (startPos + numRead), read)) { + LOG.error(String.format("invalid bytes: [%s]\n", Arrays.toString(buf))); + throw new ChecksumException( + String.format("unable to validate bytes"), + startPos + ); + } + + numRead += read; + } + + inputStream.close(); + return numRead + startPos - 1; + } + + public void _testClientReadsPastBlockEnd() throws IOException { + // todo : client has length greater than server--should handle with + // exception, not with corrupt data!!! + } +} \ No newline at end of file Modified: hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=959555&r1=959554&r2=959555&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original) +++ hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Thu Jul 1 08:37:30 2010 @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.datanode; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -66,6 +67,7 @@ public class SimulatedFSDataset impleme Configuration conf = null; static byte[] nullCrcFileData; + { DataChecksum checksum = DataChecksum.newDataChecksum( DataChecksum. CHECKSUM_NULL, 16*1024 ); @@ -325,6 +327,16 @@ public class SimulatedFSDataset impleme return binfo.getlength(); } + @Override + public long getVisibleLength(Block b) throws IOException { + return getLength(b); + } + + @Override + public void setVisibleLength(Block b, long length) throws IOException { + //no-op + } + /** {@inheritDoc} */ public Block getStoredBlock(long blkid) throws IOException { Block b = new Block(blkid);