Return-Path: Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: (qmail 11117 invoked from network); 30 Jun 2010 14:22:04 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 30 Jun 2010 14:22:04 -0000 Received: (qmail 50637 invoked by uid 500); 30 Jun 2010 14:22:04 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 50563 invoked by uid 500); 30 Jun 2010 14:22:03 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 50555 invoked by uid 99); 30 Jun 2010 14:22:03 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Jun 2010 14:22:03 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Jun 2010 14:21:58 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 34D9023888EA; Wed, 30 Jun 2010 14:21:04 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r959324 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/hdfs/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/ Date: Wed, 30 Jun 2010 14:21:03 -0000 To: hdfs-commits@hadoop.apache.org From: hairong@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100630142104.34D9023888EA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: hairong Date: Wed Jun 30 14:21:03 2010 New Revision: 959324 URL: http://svn.apache.org/viewvc?rev=959324&view=rev Log: HDFS-1057 Concurrent readers hit ChecksumExceptions if following a writer to very end of file. Contributed by Sam Rash. Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ChunkChecksum.java hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileConcurrentReader.java Modified: hadoop/hdfs/trunk/CHANGES.txt hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Modified: hadoop/hdfs/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=959324&r1=959323&r2=959324&view=diff ============================================================================== --- hadoop/hdfs/trunk/CHANGES.txt (original) +++ hadoop/hdfs/trunk/CHANGES.txt Wed Jun 30 14:21:03 2010 @@ -1004,6 +1004,9 @@ Release 0.21.0 - Unreleased HDFS-1256. libhdfs is missing from the tarball. (tomwhite) + HDFS_1057. Concurrent readers hit ChecksumExceptions if following a + writer to very end of file. (sam rash via hairong) + Release 0.20.3 - Unreleased IMPROVEMENTS Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=959324&r1=959323&r2=959324&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java Wed Jun 30 14:21:03 2010 @@ -36,7 +36,9 @@ import org.apache.hadoop.hdfs.protocol.L import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; @@ -139,6 +141,8 @@ class DFSInputStream extends FSInputStre if (locatedblock == null || locatedblock.getLocations().length == 0) { return 0; } + int replicaNotFoundCount = locatedblock.getLocations().length; + for(DatanodeInfo datanode : locatedblock.getLocations()) { try { final ClientDatanodeProtocol cdp = DFSClient.createClientDatanodeProtocolProxy( @@ -149,12 +153,28 @@ class DFSInputStream extends FSInputStre } } catch(IOException ioe) { + if (ioe instanceof RemoteException && + (((RemoteException) ioe).unwrapRemoteException() instanceof + ReplicaNotFoundException)) { + // special case : replica might not be on the DN, treat as 0 length + replicaNotFoundCount--; + } + if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Faild to getReplicaVisibleLength from datanode " + DFSClient.LOG.debug("Failed to getReplicaVisibleLength from datanode " + datanode + " for block " + locatedblock.getBlock(), ioe); } } } + + // Namenode told us about these locations, but none know about the replica + // means that we hit the race between pipeline creation start and end. + // we require all 3 because some other exception could have happened + // on a DN that has it. we want to report that error + if (replicaNotFoundCount == 0) { + return 0; + } + throw new IOException("Cannot obtain block length for " + locatedblock); } Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=959324&r1=959323&r2=959324&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Wed Jun 30 14:21:03 2010 @@ -28,6 +28,7 @@ import java.io.EOFException; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.LinkedList; import java.util.zip.Checksum; @@ -509,6 +510,8 @@ class BlockReceiver implements java.io.C verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff); } + byte[] lastChunkChecksum; + try { long onDiskLen = replicaInfo.getBytesOnDisk(); if (onDiskLen errorMessage = new AtomicReference(); + final FSDataOutputStream out = fileSystem.create(file); + + final Thread writer = new Thread(new Runnable() { + @Override + public void run() { + try { + while (!openerDone.get()) { + out.write(DFSTestUtil.generateSequentialBytes(0, writeSize)); + out.hflush(); + } + } catch (IOException e) { + LOG.warn("error in writer", e); + } finally { + try { + out.close(); + } catch (IOException e) { + LOG.error("unable to close file"); + } + } + } + }); + + Thread opener = new Thread(new Runnable() { + @Override + public void run() { + try { + for (int i = 0; i < requiredSuccessfulOpens; i++) { + fileSystem.open(file).close(); + } + openerDone.set(true); + } catch (IOException e) { + openerDone.set(true); + errorMessage.set(String.format( + "got exception : %s", + StringUtils.stringifyException(e) + )); + } catch (Exception e) { + openerDone.set(true); + errorMessage.set(String.format( + "got exception : %s", + StringUtils.stringifyException(e) + )); + writer.interrupt(); + fail("here"); + } + } + }); + + writer.start(); + opener.start(); + + try { + writer.join(); + opener.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + assertNull(errorMessage.get(), errorMessage.get()); + } + + // for some reason, using tranferTo evokes the race condition more often + // so test separately + public void testUnfinishedBlockCRCErrorTransferTo() throws IOException { + runTestUnfinishedBlockCRCError(true, SyncType.SYNC, DEFAULT_WRITE_SIZE); + } + + public void testUnfinishedBlockCRCErrorTransferToVerySmallWrite() + throws IOException { + runTestUnfinishedBlockCRCError(true, SyncType.SYNC, SMALL_WRITE_SIZE); + } + + // fails due to issue w/append, disable + public void _testUnfinishedBlockCRCErrorTransferToAppend() + throws IOException { + runTestUnfinishedBlockCRCError(true, SyncType.APPEND, DEFAULT_WRITE_SIZE); + } + + public void testUnfinishedBlockCRCErrorNormalTransfer() throws IOException { + runTestUnfinishedBlockCRCError(false, SyncType.SYNC, DEFAULT_WRITE_SIZE); + } + + public void testUnfinishedBlockCRCErrorNormalTransferVerySmallWrite() + throws IOException { + runTestUnfinishedBlockCRCError(false, SyncType.SYNC, SMALL_WRITE_SIZE); + } + + // fails due to issue w/append, disable + public void _testUnfinishedBlockCRCErrorNormalTransferAppend() + throws IOException { + runTestUnfinishedBlockCRCError(false, SyncType.APPEND, DEFAULT_WRITE_SIZE); + } + + private void runTestUnfinishedBlockCRCError( + final boolean transferToAllowed, SyncType syncType, int writeSize + ) throws IOException { + runTestUnfinishedBlockCRCError( + transferToAllowed, syncType, writeSize, new Configuration() + ); + } + + private void runTestUnfinishedBlockCRCError( + final boolean transferToAllowed, + final SyncType syncType, + final int writeSize, + Configuration conf + ) throws IOException { + conf.setBoolean("dfs.support.append", syncType == SyncType.APPEND); + conf.setBoolean("dfs.datanode.transferTo.allowed", transferToAllowed); + init(conf); + + final Path file = new Path("/block-being-written-to"); + final int numWrites = 2000; + final AtomicBoolean writerDone = new AtomicBoolean(false); + final AtomicBoolean writerStarted = new AtomicBoolean(false); + final AtomicBoolean error = new AtomicBoolean(false); + final FSDataOutputStream initialOutputStream = fileSystem.create(file); + final Thread writer = new Thread(new Runnable() { + private FSDataOutputStream outputStream = initialOutputStream; + + @Override + public void run() { + try { + for (int i = 0; !error.get() && i < numWrites; i++) { + try { + final byte[] writeBuf = + DFSTestUtil.generateSequentialBytes(i * writeSize, writeSize); + outputStream.write(writeBuf); + if (syncType == SyncType.SYNC) { + outputStream.hflush(); + } else { // append + outputStream.close(); + outputStream = fileSystem.append(file); + } + writerStarted.set(true); + } catch (IOException e) { + error.set(true); + LOG.error("error writing to file", e); + } + } + + writerDone.set(true); + outputStream.close(); + } catch (Exception e) { + LOG.error("error in writer", e); + + throw new RuntimeException(e); + } + } + }); + Thread tailer = new Thread(new Runnable() { + @Override + public void run() { + try { + long startPos = 0; + while (!writerDone.get() && !error.get()) { + if (writerStarted.get()) { + try { + startPos = tailFile(file, startPos); + } catch (IOException e) { + LOG.error(String.format("error tailing file %s", file), e); + + throw new RuntimeException(e); + } + } + } + } catch (RuntimeException e) { + if (e.getCause() instanceof ChecksumException) { + error.set(true); + } + + writer.interrupt(); + LOG.error("error in tailer", e); + throw e; + } + } + }); + + writer.start(); + tailer.start(); + + try { + writer.join(); + tailer.join(); + + assertFalse( + "error occurred, see log above", error.get() + ); + } catch (InterruptedException e) { + LOG.info("interrupted waiting for writer or tailer to complete"); + + Thread.currentThread().interrupt(); + } + initialOutputStream.close(); + } + + private boolean validateSequentialBytes(byte[] buf, int startPos, int len) { + for (int i = 0; i < len; i++) { + int expected = (i + startPos) % 127; + + if (buf[i] % 127 != expected) { + LOG.error(String.format("at position [%d], got [%d] and expected [%d]", + startPos, buf[i], expected)); + + return false; + } + } + + return true; + } + + private long tailFile(Path file, long startPos) throws IOException { + long numRead = 0; + FSDataInputStream inputStream = fileSystem.open(file); + inputStream.seek(startPos); + + int len = 4 * 1024; + byte[] buf = new byte[len]; + int read; + while ((read = inputStream.read(buf)) > -1) { + LOG.info(String.format("read %d bytes", read)); + + if (!validateSequentialBytes(buf, (int) (startPos + numRead), read)) { + LOG.error(String.format("invalid bytes: [%s]\n", Arrays.toString(buf))); + throw new ChecksumException( + String.format("unable to validate bytes"), + startPos + ); + } + + numRead += read; + } + + inputStream.close(); + return numRead + startPos - 1; + } +} \ No newline at end of file Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=959324&r1=959323&r2=959324&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original) +++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Wed Jun 30 14:21:03 2010 @@ -249,10 +249,13 @@ public class SimulatedFSDataset impleme } @Override - synchronized public void setBytesOnDisk(long bytesOnDisk) { - if (!finalized) { - oStream.setLength(bytesOnDisk); - } + public void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) { + oStream.setLength(dataLength); + } + + @Override + public ChunkChecksum getLastChecksumAndDataLen() { + return new ChunkChecksum(oStream.getLength(), null); } }