Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6591342BC for ; Thu, 19 May 2011 22:33:16 +0000 (UTC) Received: (qmail 73549 invoked by uid 500); 19 May 2011 22:33:16 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 73490 invoked by uid 500); 19 May 2011 22:33:16 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 73481 invoked by uid 99); 19 May 2011 22:33:16 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 May 2011 22:33:16 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 May 2011 22:33:13 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id C392B238890D; Thu, 19 May 2011 22:32:53 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1125145 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/ Date: Thu, 19 May 2011 22:32:53 -0000 To: hdfs-commits@hadoop.apache.org From: jitendra@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110519223253.C392B238890D@eris.apache.org> Author: jitendra Date: Thu May 19 22:32:53 2011 New Revision: 1125145 URL: http://svn.apache.org/viewvc?rev=1125145&view=rev Log: HDFS-1371. One bad node can incorrectly flag many files as corrupt. Contributed by Tanping Wang. Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientReportBadBlock.java Modified: hadoop/hdfs/trunk/CHANGES.txt hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java Modified: hadoop/hdfs/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1125145&r1=1125144&r2=1125145&view=diff ============================================================================== --- hadoop/hdfs/trunk/CHANGES.txt (original) +++ hadoop/hdfs/trunk/CHANGES.txt Thu May 19 22:32:53 2011 @@ -610,6 +610,9 @@ Trunk (unreleased changes) HDFS-1922. Fix recurring failure of TestJMXGet (Luke Lu via todd) + HDFS-1371. One bad node can incorrectly flag many files as corrupt. + (Tanping Wang via jitendra) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1125145&r1=1125144&r2=1125145&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java Thu May 19 22:32:53 2011 @@ -22,8 +22,13 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.util.AbstractMap; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.classification.InterfaceAudience; @@ -62,7 +67,7 @@ public class DFSInputStream extends FSIn private LocatedBlocks locatedBlocks = null; private long lastBlockBeingWrittenLength = 0; private DatanodeInfo currentNode = null; - private ExtendedBlock currentBlock = null; + private LocatedBlock currentLocatedBlock = null; private long pos = 0; private long blockEnd = -1; @@ -204,8 +209,11 @@ public class DFSInputStream extends FSIn /** * Returns the block containing the target position. */ - public ExtendedBlock getCurrentBlock() { - return currentBlock; + synchronized public ExtendedBlock getCurrentBlock() { + if (currentLocatedBlock == null){ + return null; + } + return currentLocatedBlock.getBlock(); } /** @@ -261,7 +269,7 @@ public class DFSInputStream extends FSIn if (updatePosition) { pos = offset; blockEnd = blk.getStartOffset() + blk.getBlockSize() - 1; - currentBlock = blk.getBlock(); + currentLocatedBlock = blk; } return blk; } @@ -462,8 +470,9 @@ public class DFSInputStream extends FSIn * name readBuffer() is chosen to imply similarity to readBuffer() in * ChecksuFileSystem */ - private synchronized int readBuffer(byte buf[], int off, int len) - throws IOException { + private synchronized int readBuffer(byte buf[], int off, int len, + Map> corruptedBlockMap) + throws IOException { IOException ioe; /* we retry current node only once. So this is set to true only here. @@ -479,16 +488,19 @@ public class DFSInputStream extends FSIn try { return blockReader.read(buf, off, len); } catch ( ChecksumException ce ) { - DFSClient.LOG.warn("Found Checksum error for " + currentBlock + " from " + - currentNode.getName() + " at " + ce.getPos()); - dfsClient.reportChecksumFailure(src, currentBlock, currentNode); + DFSClient.LOG.warn("Found Checksum error for " + + getCurrentBlock() + " from " + currentNode.getName() + + " at " + ce.getPos()); ioe = ce; retryCurrentNode = false; + // we want to remember which block replicas we have tried + addIntoCorruptedBlockMap(getCurrentBlock(), currentNode, + corruptedBlockMap); } catch ( IOException e ) { if (!retryCurrentNode) { - DFSClient.LOG.warn("Exception while reading from " + currentBlock + - " of " + src + " from " + currentNode + ": " + - StringUtils.stringifyException(e)); + DFSClient.LOG.warn("Exception while reading from " + + getCurrentBlock() + " of " + src + " from " + + currentNode + ": " + StringUtils.stringifyException(e)); } ioe = e; } @@ -519,6 +531,8 @@ public class DFSInputStream extends FSIn if (closed) { throw new IOException("Stream closed"); } + Map> corruptedBlockMap + = new HashMap>(); failures = 0; if (pos < getFileLength()) { int retries = 2; @@ -528,7 +542,7 @@ public class DFSInputStream extends FSIn currentNode = blockSeekTo(pos); } int realLen = (int) Math.min((long) len, (blockEnd - pos + 1L)); - int result = readBuffer(buf, off, realLen); + int result = readBuffer(buf, off, realLen, corruptedBlockMap); if (result >= 0) { pos += result; @@ -551,12 +565,34 @@ public class DFSInputStream extends FSIn if (--retries == 0) { throw e; } + } finally { + // Check if need to report block replicas corruption either read + // was successful or ChecksumException occured. + reportCheckSumFailure(corruptedBlockMap, + currentLocatedBlock.getLocations().length); } } } return -1; } + /** + * Add corrupted block replica into map. + * @param corruptedBlockMap + */ + private void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node, + Map> corruptedBlockMap) { + Set dnSet = null; + if((corruptedBlockMap.containsKey(blk))) { + dnSet = corruptedBlockMap.get(blk); + }else { + dnSet = new HashSet(); + } + if (!dnSet.contains(node)) { + dnSet.add(node); + corruptedBlockMap.put(blk, dnSet); + } + } private DNAddrPair chooseDataNode(LocatedBlock block) throws IOException { @@ -605,8 +641,10 @@ public class DFSInputStream extends FSIn } } - private void fetchBlockByteRange(LocatedBlock block, long start, - long end, byte[] buf, int offset) throws IOException { + private void fetchBlockByteRange(LocatedBlock block, long start, long end, + byte[] buf, int offset, + Map> corruptedBlockMap) + throws IOException { // // Connect to best DataNode for desired Block, with potential offset // @@ -646,7 +684,8 @@ public class DFSInputStream extends FSIn DFSClient.LOG.warn("fetchBlockByteRange(). Got a checksum exception for " + src + " at " + block.getBlock() + ":" + e.getPos() + " from " + chosenNode.getName()); - dfsClient.reportChecksumFailure(src, block.getBlock(), chosenNode); + // we want to remember what we have tried + addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap); } catch (IOException e) { if (e instanceof InvalidBlockTokenException && refetchToken > 0) { DFSClient.LOG.info("Will get a new access token and retry, " @@ -703,11 +742,21 @@ public class DFSInputStream extends FSIn // corresponding to position and realLen List blockRange = getBlockRange(position, realLen); int remaining = realLen; + Map> corruptedBlockMap + = new HashMap>(); for (LocatedBlock blk : blockRange) { long targetStart = position - blk.getStartOffset(); long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart); - fetchBlockByteRange(blk, targetStart, - targetStart + bytesToRead - 1, buffer, offset); + try { + fetchBlockByteRange(blk, targetStart, + targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap); + } finally { + // Check and report if any block replicas are corrupted. + // BlockMissingException may be caught if all block replicas are + // corrupted. + reportCheckSumFailure(corruptedBlockMap, blk.getLocations().length); + } + remaining -= bytesToRead; position += bytesToRead; offset += bytesToRead; @@ -718,7 +767,43 @@ public class DFSInputStream extends FSIn } return realLen; } - + + /** + * DFSInputStream reports checksum failure. + * Case I : client has tried multiple data nodes and at least one of the + * attempts has succeeded. We report the other failures as corrupted block to + * namenode. + * Case II: client has tried out all data nodes, but all failed. We + * only report if the total number of replica is 1. We do not + * report otherwise since this maybe due to the client is a handicapped client + * (who can not read). + * @param corruptedBlockMap, map of corrupted blocks + * @param dataNodeCount, number of data nodes who contains the block replicas + */ + private void reportCheckSumFailure( + Map> corruptedBlockMap, + int dataNodeCount) { + if (corruptedBlockMap.isEmpty()) { + return; + } + Iterator>> it = corruptedBlockMap + .entrySet().iterator(); + Entry> entry = it.next(); + ExtendedBlock blk = entry.getKey(); + Set dnSet = entry.getValue(); + if (((dnSet.size() < dataNodeCount) && (dnSet.size() > 0)) + || ((dataNodeCount == 1) && (dnSet.size() == dataNodeCount))) { + DatanodeInfo[] locs = new DatanodeInfo[dnSet.size()]; + int i = 0; + for (DatanodeInfo dn:dnSet) { + locs[i++] = dn; + } + LocatedBlock [] lblocks = { new LocatedBlock(blk, locs) }; + dfsClient.reportChecksumFailure(src, lblocks); + } + corruptedBlockMap.clear(); + } + @Override public long skip(long n) throws IOException { if ( n > 0 ) { @@ -760,9 +845,10 @@ public class DFSInputStream extends FSIn } } catch (IOException e) {//make following read to retry if(DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Exception while seek to " + targetPos + - " from " + currentBlock +" of " + src + " from " + - currentNode + ": " + StringUtils.stringifyException(e)); + DFSClient.LOG.debug("Exception while seek to " + targetPos + + " from " + getCurrentBlock() + " of " + src + + " from " + currentNode + ": " + + StringUtils.stringifyException(e)); } } } Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1125145&r1=1125144&r2=1125145&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Thu May 19 22:32:53 2011 @@ -689,6 +689,9 @@ public class DistributedFileSystem exten * is corrupt but we will report both to the namenode. In the future, * we can consider figuring out exactly which block is corrupt. */ + // We do not see a need for user to report block checksum errors and do not + // want to rely on user to report block corruptions. + @Deprecated public boolean reportChecksumFailure(Path f, FSDataInputStream in, long inPos, FSDataInputStream sums, long sumsPos) { Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientReportBadBlock.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientReportBadBlock.java?rev=1125145&view=auto ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientReportBadBlock.java (added) +++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientReportBadBlock.java Thu May 19 22:32:53 2011 @@ -0,0 +1,353 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.ByteArrayOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.PrintStream; +import java.io.RandomAccessFile; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.FSDataset; +import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck; +import org.apache.hadoop.hdfs.tools.DFSck; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.util.ToolRunner; + +import org.junit.Before; +import org.junit.After; +import org.junit.Test; +import junit.framework.Assert; + +/** + * Class is used to test client reporting corrupted block replica to name node. + * The reporting policy is if block replica is more than one, if all replicas + * are corrupted, client does not report (since the client can handicapped). If + * some of the replicas are corrupted, client reports the corrupted block + * replicas. In case of only one block replica, client always reports corrupted + * replica. + */ +public class TestClientReportBadBlock { + private static final Log LOG = LogFactory + .getLog(TestClientReportBadBlock.class); + + static final long BLOCK_SIZE = 64 * 1024; + private static int buffersize; + private static MiniDFSCluster cluster; + private static DistributedFileSystem dfs; + private static int numDataNodes = 3; + private static final Configuration conf = new HdfsConfiguration(); + + Random rand = new Random(); + + @Before + public void startUpCluster() throws IOException { + if (System.getProperty("test.build.data") == null) { // to allow test to be + // run outside of Ant + System.setProperty("test.build.data", "build/test/data"); + } + // disable block scanner + conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); + + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes) + .build(); + cluster.waitActive(); + dfs = (DistributedFileSystem) cluster.getFileSystem(); + buffersize = conf.getInt("io.file.buffer.size", 4096); + } + + @After + public void shutDownCluster() throws IOException { + dfs.close(); + cluster.shutdown(); + } + + /* + * This test creates a file with one block replica. Corrupt the block. Make + * DFSClient read the corrupted file. Corrupted block is expected to be + * reported to name node. + */ + @Test + public void testOneBlockReplica() throws Exception { + final short repl = 1; + final int corruptBlockNumber = 1; + for (int i = 0; i < 2; i++) { + // create a file + String fileName = "/tmp/testClientReportBadBlock/OneBlockReplica" + i; + Path filePath = new Path(fileName); + createAFileWithCorruptedBlockReplicas(filePath, repl, corruptBlockNumber); + if (i == 0) { + dfsClientReadFile(filePath); + } else { + dfsClientReadFileFromPosition(filePath); + } + // the only block replica is corrupted. The LocatedBlock should be marked + // as corrupted. But the corrupted replica is expected to be returned + // when calling Namenode#getBlockLocations() since all(one) replicas are + // corrupted. + int expectedReplicaCount = 1; + verifyCorruptedBlockCount(filePath, expectedReplicaCount); + verifyFirstBlockCorrupted(filePath, true); + verifyFsckBlockCorrupted(); + testFsckListCorruptFilesBlocks(filePath, -1); + } + } + + /** + * This test creates a file with three block replicas. Corrupt all of the + * replicas. Make dfs client read the file. No block corruption should be + * reported. + */ + @Test + public void testCorruptAllOfThreeReplicas() throws Exception { + final short repl = 3; + final int corruptBlockNumber = 3; + for (int i = 0; i < 2; i++) { + // create a file + String fileName = "/tmp/testClientReportBadBlock/testCorruptAllReplicas" + + i; + Path filePath = new Path(fileName); + createAFileWithCorruptedBlockReplicas(filePath, repl, corruptBlockNumber); + // ask dfs client to read the file + if (i == 0) { + dfsClientReadFile(filePath); + } else { + dfsClientReadFileFromPosition(filePath); + } + // As all replicas are corrupted. We expect DFSClient does NOT report + // corrupted replicas to the name node. + int expectedReplicasReturned = repl; + verifyCorruptedBlockCount(filePath, expectedReplicasReturned); + // LocatedBlock should not have the block marked as corrupted. + verifyFirstBlockCorrupted(filePath, false); + verifyFsckHealth(""); + testFsckListCorruptFilesBlocks(filePath, 0); + } + } + + /** + * This test creates a file with three block replicas. Corrupt two of the + * replicas. Make dfs client read the file. The corrupted blocks with their + * owner data nodes should be reported to the name node. + */ + @Test + public void testCorruptTwoOutOfThreeReplicas() throws Exception { + final short repl = 3; + final int corruptBlocReplicas = 2; + for (int i = 0; i < 2; i++) { + String fileName = + "/tmp/testClientReportBadBlock/CorruptTwoOutOfThreeReplicas"+ i; + Path filePath = new Path(fileName); + createAFileWithCorruptedBlockReplicas(filePath, repl, corruptBlocReplicas); + int replicaCount = 0; + /* + * The order of data nodes in LocatedBlock returned by name node is sorted + * by NetworkToplology#pseudoSortByDistance. In current MiniDFSCluster, + * when LocatedBlock is returned, the sorting is based on a random order. + * That is to say, the DFS client and simulated data nodes in mini DFS + * cluster are considered not on the same host nor the same rack. + * Therefore, even we corrupted the first two block replicas based in + * order. When DFSClient read some block replicas, it is not guaranteed + * which block replicas (good/bad) will be returned first. So we try to + * re-read the file until we know the expected replicas numbers is + * returned. + */ + while (replicaCount != repl - corruptBlocReplicas) { + if (i == 0) { + dfsClientReadFile(filePath); + } else { + dfsClientReadFileFromPosition(filePath); + } + LocatedBlocks blocks = dfs.dfs.getNamenode(). + getBlockLocations(filePath.toString(), 0, Long.MAX_VALUE); + replicaCount = blocks.get(0).getLocations().length; + } + verifyFirstBlockCorrupted(filePath, false); + int expectedReplicaCount = repl-corruptBlocReplicas; + verifyCorruptedBlockCount(filePath, expectedReplicaCount); + verifyFsckHealth("Target Replicas is 3 but found 1 replica"); + testFsckListCorruptFilesBlocks(filePath, 0); + } + } + + /** + * create a file with one block and corrupt some/all of the block replicas. + */ + private void createAFileWithCorruptedBlockReplicas(Path filePath, short repl, + int corruptBlockCount) throws IOException, AccessControlException, + FileNotFoundException, UnresolvedLinkException { + DFSTestUtil.createFile(dfs, filePath, BLOCK_SIZE, repl, 0); + DFSTestUtil.waitReplication(dfs, filePath, repl); + // Locate the file blocks by asking name node + final LocatedBlocks locatedblocks = dfs.dfs.getNamenode() + .getBlockLocations(filePath.toString(), 0L, BLOCK_SIZE); + Assert.assertEquals(repl, locatedblocks.get(0).getLocations().length); + // The file only has one block + LocatedBlock lblock = locatedblocks.get(0); + DatanodeInfo[] datanodeinfos = lblock.getLocations(); + ExtendedBlock block = lblock.getBlock(); + // corrupt some /all of the block replicas + for (int i = 0; i < corruptBlockCount; i++) { + DatanodeInfo dninfo = datanodeinfos[i]; + final DataNode dn = cluster.getDataNode(dninfo.getIpcPort()); + corruptBlock(block, dn); + LOG.debug("Corrupted block " + block.getBlockName() + " on data node " + + dninfo.getName()); + + } + } + + /** + * Verify the first block of the file is corrupted (for all its replica). + */ + private void verifyFirstBlockCorrupted(Path filePath, boolean isCorrupted) + throws AccessControlException, FileNotFoundException, + UnresolvedLinkException, IOException { + final LocatedBlocks locatedBlocks = dfs.dfs.getNamenode() + .getBlockLocations(filePath.toUri().getPath(), 0, Long.MAX_VALUE); + final LocatedBlock firstLocatedBlock = locatedBlocks.get(0); + Assert.assertEquals(isCorrupted, firstLocatedBlock.isCorrupt()); + } + + /** + * Verify the number of corrupted block replicas by fetching the block + * location from name node. + */ + private void verifyCorruptedBlockCount(Path filePath, int expectedReplicas) + throws AccessControlException, FileNotFoundException, + UnresolvedLinkException, IOException { + final LocatedBlocks lBlocks = dfs.dfs.getNamenode().getBlockLocations( + filePath.toUri().getPath(), 0, Long.MAX_VALUE); + // we expect only the first block of the file is used for this test + LocatedBlock firstLocatedBlock = lBlocks.get(0); + Assert.assertEquals(expectedReplicas, + firstLocatedBlock.getLocations().length); + } + + /** + * Ask dfs client to read the file + */ + private void dfsClientReadFile(Path corruptedFile) throws IOException, + UnresolvedLinkException { + DFSInputStream in = dfs.dfs.open(corruptedFile.toUri().getPath()); + byte[] buf = new byte[buffersize]; + int nRead = 0; // total number of bytes read + + try { + do { + nRead = in.read(buf, 0, buf.length); + } while (nRead > 0); + } catch (ChecksumException ce) { + // caught ChecksumException if all replicas are bad, ignore and continue. + LOG.debug("DfsClientReadFile caught ChecksumException."); + } catch (BlockMissingException bme) { + // caught BlockMissingException, ignore. + LOG.debug("DfsClientReadFile caught BlockMissingException."); + } + } + + /** + * DFS client read bytes starting from the specified position. + */ + private void dfsClientReadFileFromPosition(Path corruptedFile) + throws UnresolvedLinkException, IOException { + DFSInputStream in = dfs.dfs.open(corruptedFile.toUri().getPath()); + byte[] buf = new byte[buffersize]; + int startPosition = 2; + int nRead = 0; // total number of bytes read + try { + do { + nRead = in.read(startPosition, buf, 0, buf.length); + startPosition += buf.length; + } while (nRead > 0); + } catch (BlockMissingException bme) { + LOG.debug("DfsClientReadFile caught BlockMissingException."); + } + } + + /** + * Corrupt a block on a data node. Replace the block file content with content + * of 1, 2, ...BLOCK_SIZE. + * + * @param block + * the ExtendedBlock to be corrupted + * @param dn + * the data node where the block needs to be corrupted + * @throws FileNotFoundException + * @throws IOException + */ + private static void corruptBlock(final ExtendedBlock block, final DataNode dn) + throws FileNotFoundException, IOException { + final FSDataset data = (FSDataset) dn.getFSDataset(); + final RandomAccessFile raFile = new RandomAccessFile( + data.getBlockFile(block), "rw"); + final byte[] bytes = new byte[(int) BLOCK_SIZE]; + for (int i = 0; i < BLOCK_SIZE; i++) { + bytes[i] = (byte) (i); + } + raFile.write(bytes); + raFile.close(); + } + + private static void verifyFsckHealth(String expected) throws Exception { + // Fsck health has error code 0. + // Make sure filesystem is in healthy state + String outStr = runFsck(conf, 0, true, "/"); + LOG.info(outStr); + Assert.assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); + if (!expected.equals("")) { + Assert.assertTrue(outStr.contains(expected)); + } + } + + private static void verifyFsckBlockCorrupted() throws Exception { + String outStr = runFsck(conf, 1, true, "/"); + LOG.info(outStr); + Assert.assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS)); + } + + private static void testFsckListCorruptFilesBlocks(Path filePath, int errorCode) throws Exception{ + String outStr = runFsck(conf, errorCode, true, filePath.toString(), "-list-corruptfileblocks"); + LOG.info("fsck -list-corruptfileblocks out: " + outStr); + if (errorCode != 0) { + Assert.assertTrue(outStr.contains("CORRUPT files")); + } + } + + static String runFsck(Configuration conf, int expectedErrCode, + boolean checkErrorCode, String... path) throws Exception { + ByteArrayOutputStream bStream = new ByteArrayOutputStream(); + PrintStream out = new PrintStream(bStream, true); + int errCode = ToolRunner.run(new DFSck(conf, out), path); + if (checkErrorCode) + Assert.assertEquals(expectedErrCode, errCode); + return bStream.toString(); + } +} Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java?rev=1125145&r1=1125144&r2=1125145&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java (original) +++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java Thu May 19 22:32:53 2011 @@ -341,16 +341,17 @@ public class TestFsck extends TestCase { int replicaCount = 0; Random random = new Random(); String outStr = null; + short factor = 1; MiniDFSCluster cluster = null; try { - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); fs = cluster.getFileSystem(); Path file1 = new Path("/testCorruptBlock"); - DFSTestUtil.createFile(fs, file1, 1024, (short)3, 0); + DFSTestUtil.createFile(fs, file1, 1024, factor, 0); // Wait until file replication has completed - DFSTestUtil.waitReplication(fs, file1, (short)3); + DFSTestUtil.waitReplication(fs, file1, factor); ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file1); // Make sure filesystem is in healthy state @@ -358,18 +359,16 @@ public class TestFsck extends TestCase { System.out.println(outStr); assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); - // corrupt replicas - for (int i=0; i < 3; i++) { - File blockFile = MiniDFSCluster.getBlockFile(i, block); - if (blockFile != null && blockFile.exists()) { - RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw"); - FileChannel channel = raFile.getChannel(); - String badString = "BADBAD"; - int rand = random.nextInt((int)channel.size()/2); - raFile.seek(rand); - raFile.write(badString.getBytes()); - raFile.close(); - } + // corrupt replicas + File blockFile = MiniDFSCluster.getBlockFile(0, block); + if (blockFile != null && blockFile.exists()) { + RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw"); + FileChannel channel = raFile.getChannel(); + String badString = "BADBAD"; + int rand = random.nextInt((int) channel.size()/2); + raFile.seek(rand); + raFile.write(badString.getBytes()); + raFile.close(); } // Read the file to trigger reportBadBlocks try { @@ -384,7 +383,7 @@ public class TestFsck extends TestCase { blocks = dfsClient.getNamenode(). getBlockLocations(file1.toString(), 0, Long.MAX_VALUE); replicaCount = blocks.get(0).getLocations().length; - while (replicaCount != 3) { + while (replicaCount != factor) { try { Thread.sleep(100); } catch (InterruptedException ignore) {