Return-Path: Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: (qmail 56606 invoked from network); 22 Dec 2009 04:38:57 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 22 Dec 2009 04:38:57 -0000 Received: (qmail 62658 invoked by uid 500); 22 Dec 2009 04:38:57 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 62626 invoked by uid 500); 22 Dec 2009 04:38:56 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 62616 invoked by uid 99); 22 Dec 2009 04:38:55 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Dec 2009 04:38:55 +0000 X-ASF-Spam-Status: No, hits=-2.6 required=5.0 tests=AWL,BAYES_00 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Dec 2009 04:38:44 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id CAF132388999; Tue, 22 Dec 2009 04:38:24 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r893066 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/aop/org/apache/hadoop/fi/ src/test/aop/org/apache/hadoop/hdfs/ src/test/aop/org/apache/hadoop/hdfs/server/datanode/ Date: Tue, 22 Dec 2009 04:38:23 -0000 To: hdfs-commits@hadoop.apache.org From: hairong@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091222043824.CAF132388999@eris.apache.org> Author: hairong Date: Tue Dec 22 04:38:16 2009 New Revision: 893066 URL: http://svn.apache.org/viewvc?rev=893066&view=rev Log: HDFS-564. Adding pipeline tests 17-35. Contributed by Nicholas, Kan, and Hairong. Added: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java Modified: hadoop/hdfs/trunk/CHANGES.txt hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/PipelinesTestUtil.java hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java Modified: hadoop/hdfs/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=893066&r1=893065&r2=893066&view=diff ============================================================================== --- hadoop/hdfs/trunk/CHANGES.txt (original) +++ hadoop/hdfs/trunk/CHANGES.txt Tue Dec 22 04:38:16 2009 @@ -576,6 +576,8 @@ HDFS-724. Pipeline hangs if one of the block receiver is not responsive. (hairong) + HDFS-564. Adding pipeline tests 17-35. (hairong) + Release 0.20.2 - Unreleased IMPROVEMENTS Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=893066&r1=893065&r2=893066&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Dec 22 04:38:16 2009 @@ -432,6 +432,14 @@ return receivePacket(offsetInBlock, seqno, lastPacketInBlock, len, endOfHeader); } + /** + * Write the received packet to disk (data only) + */ + private void writePacketToDisk(byte[] pktBuf, int startByteToDisk, + int numBytesToDisk) throws IOException { + out.write(pktBuf, startByteToDisk, numBytesToDisk); + } + /** * Receives and processes a packet. It can contain many chunks. * returns the number of data bytes that the packet has. @@ -524,7 +532,7 @@ int startByteToDisk = dataOff+(int)(onDiskLen-firstByteInBlock); int numBytesToDisk = (int)(offsetInBlock-onDiskLen); - out.write(pktBuf, startByteToDisk, numBytesToDisk); + writePacketToDisk(pktBuf, startByteToDisk, numBytesToDisk); // If this is a partial chunk, then verify that this is the only // chunk in the packet. Calculate new crc for this chunk. Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java?rev=893066&r1=893065&r2=893066&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java (original) +++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java Tue Dec 22 04:38:16 2009 @@ -59,30 +59,36 @@ private volatile boolean isSuccess = false; /** Simulate action for the receiverOpWriteBlock pointcut */ - public final ActionContainer fiReceiverOpWriteBlock - = new ActionContainer(); + public final ActionContainer fiReceiverOpWriteBlock + = new ActionContainer(); /** Simulate action for the callReceivePacket pointcut */ - public final ActionContainer fiCallReceivePacket - = new ActionContainer(); + public final ActionContainer fiCallReceivePacket + = new ActionContainer(); + /** Simulate action for the callWritePacketToDisk pointcut */ + public final ActionContainer fiCallWritePacketToDisk + = new ActionContainer(); /** Simulate action for the statusRead pointcut */ - public final ActionContainer fiStatusRead - = new ActionContainer(); + public final ActionContainer fiStatusRead + = new ActionContainer(); + /** Simulate action for the afterDownstreamStatusRead pointcut */ + public final ActionContainer fiAfterDownstreamStatusRead + = new ActionContainer(); /** Simulate action for the pipelineAck pointcut */ - public final ActionContainer fiPipelineAck - = new ActionContainer(); + public final ActionContainer fiPipelineAck + = new ActionContainer(); /** Simulate action for the pipelineClose pointcut */ - public final ActionContainer fiPipelineClose - = new ActionContainer(); + public final ActionContainer fiPipelineClose + = new ActionContainer(); /** Simulate action for the blockFileClose pointcut */ - public final ActionContainer fiBlockFileClose - = new ActionContainer(); + public final ActionContainer fiBlockFileClose + = new ActionContainer(); /** Verification action for the pipelineInitNonAppend pointcut */ - public final ActionContainer fiPipelineInitErrorNonAppend - = new ActionContainer(); + public final ActionContainer fiPipelineInitErrorNonAppend + = new ActionContainer(); /** Verification action for the pipelineErrorAfterInit pointcut */ - public final ActionContainer fiPipelineErrorAfterInit - = new ActionContainer(); + public final ActionContainer fiPipelineErrorAfterInit + = new ActionContainer(); /** Get test status */ public boolean isSuccess() { @@ -121,7 +127,8 @@ } /** Action for DataNode */ - public static abstract class DataNodeAction implements Action { + public static abstract class DataNodeAction implements + Action { /** The name of the test */ final String currentTest; /** The index of the datanode */ @@ -195,6 +202,28 @@ } } + /** Throws OutOfMemoryError if the count is zero. */ + public static class CountdownOomAction extends OomAction { + private final CountdownConstraint countdown; + + /** Create an action for datanode i in the pipeline with count down. */ + public CountdownOomAction(String currentTest, int i, int count) { + super(currentTest, i); + countdown = new CountdownConstraint(count); + } + + @Override + public void run(DatanodeID id) { + final DataTransferTest test = getDataTransferTest(); + final Pipeline p = test.getPipeline(id); + if (p.contains(index, id) && countdown.isSatisfied()) { + final String s = toString(id); + FiTestUtil.LOG.info(s); + throw new OutOfMemoryError(s); + } + } + } + /** Throws DiskOutOfSpaceException. */ public static class DoosAction extends DataNodeAction { /** Create an action for datanode i in the pipeline. */ @@ -242,6 +271,28 @@ } } + /** Throws DiskOutOfSpaceException if the count is zero. */ + public static class CountdownDoosAction extends DoosAction { + private final CountdownConstraint countdown; + + /** Create an action for datanode i in the pipeline with count down. */ + public CountdownDoosAction(String currentTest, int i, int count) { + super(currentTest, i); + countdown = new CountdownConstraint(count); + } + + @Override + public void run(DatanodeID id) throws DiskOutOfSpaceException { + final DataTransferTest test = getDataTransferTest(); + final Pipeline p = test.getPipeline(id); + if (p.contains(index, id) && countdown.isSatisfied()) { + final String s = toString(id); + FiTestUtil.LOG.info(s); + throw new DiskOutOfSpaceException(s); + } + } + } + /** * Sleep some period of time so that it slows down the datanode * or sleep forever so that datanode becomes not responding. @@ -307,8 +358,50 @@ } } + /** + * When the count is zero, + * sleep some period of time so that it slows down the datanode + * or sleep forever so that datanode becomes not responding. + */ + public static class CountdownSleepAction extends SleepAction { + private final CountdownConstraint countdown; + + /** + * Create an action for datanode i in the pipeline. + * @param duration In milliseconds, duration <= 0 means sleeping forever. + */ + public CountdownSleepAction(String currentTest, int i, + long duration, int count) { + this(currentTest, i, duration, duration+1, count); + } + + /** Create an action for datanode i in the pipeline with count down. */ + public CountdownSleepAction(String currentTest, int i, + long minDuration, long maxDuration, int count) { + super(currentTest, i, minDuration, maxDuration); + countdown = new CountdownConstraint(count); + } + + @Override + public void run(DatanodeID id) { + final DataTransferTest test = getDataTransferTest(); + final Pipeline p = test.getPipeline(id); + if (p.contains(index, id) && countdown.isSatisfied()) { + final String s = toString(id) + ", duration = [" + + minDuration + "," + maxDuration + ")"; + FiTestUtil.LOG.info(s); + if (maxDuration <= 1) { + for(; true; FiTestUtil.sleep(1000)); //sleep forever + } else { + FiTestUtil.sleep(minDuration, maxDuration); + } + } + } + } + /** Action for pipeline error verification */ - public static class VerificationAction implements Action { + public static class VerificationAction implements + Action { /** The name of the test */ final String currentTest; /** The error index of the datanode */ @@ -343,9 +436,10 @@ * Create a OomAction with a CountdownConstraint * so that it throws OutOfMemoryError if the count is zero. */ - public static ConstraintSatisfactionAction createCountdownOomAction( - String currentTest, int i, int count) { - return new ConstraintSatisfactionAction( + public static ConstraintSatisfactionAction + createCountdownOomAction( + String currentTest, int i, int count) { + return new ConstraintSatisfactionAction( new OomAction(currentTest, i), new CountdownConstraint(count)); } @@ -353,9 +447,10 @@ * Create a DoosAction with a CountdownConstraint * so that it throws DiskOutOfSpaceException if the count is zero. */ - public static ConstraintSatisfactionAction createCountdownDoosAction( + public static ConstraintSatisfactionAction + createCountdownDoosAction( String currentTest, int i, int count) { - return new ConstraintSatisfactionAction( + return new ConstraintSatisfactionAction( new DoosAction(currentTest, i), new CountdownConstraint(count)); } @@ -366,9 +461,9 @@ * sleep some period of time so that it slows down the datanode * or sleep forever so the that datanode becomes not responding. */ - public static ConstraintSatisfactionAction createCountdownSleepAction( + public static ConstraintSatisfactionAction createCountdownSleepAction( String currentTest, int i, long minDuration, long maxDuration, int count) { - return new ConstraintSatisfactionAction( + return new ConstraintSatisfactionAction( new SleepAction(currentTest, i, minDuration, maxDuration), new CountdownConstraint(count)); } @@ -377,7 +472,7 @@ * Same as * createCountdownSleepAction(currentTest, i, duration, duration+1, count). */ - public static ConstraintSatisfactionAction createCountdownSleepAction( + public static ConstraintSatisfactionAction createCountdownSleepAction( String currentTest, int i, long duration, int count) { return createCountdownSleepAction(currentTest, i, duration, duration+1, count); Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java?rev=893066&r1=893065&r2=893066&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java (original) +++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java Tue Dec 22 04:38:16 2009 @@ -59,9 +59,9 @@ /** Class adds new type of action */ public static class HFlushTest extends DataTransferTest { - public final ActionContainer fiCallHFlush = - new ActionContainer(); - public final ActionContainer fiErrorOnCallHFlush = - new ActionContainer(); + public final ActionContainer fiCallHFlush = + new ActionContainer(); + public final ActionContainer fiErrorOnCallHFlush = + new ActionContainer(); } } \ No newline at end of file Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java?rev=893066&r1=893065&r2=893066&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java (original) +++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java Tue Dec 22 04:38:16 2009 @@ -17,7 +17,8 @@ */ package org.apache.hadoop.fi; -import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Random; import org.apache.commons.logging.Log; @@ -95,24 +96,23 @@ } /** Action interface */ - public static interface Action { + public static interface Action { /** Run the action with the parameter. */ - public void run(T parameter) throws IOException; + public void run(T parameter) throws E; } /** An ActionContainer contains at most one action. */ - public static class ActionContainer { - private Action action; - + public static class ActionContainer { + private List> actionList = new ArrayList>(); /** Create an empty container. */ public ActionContainer() {} /** Set action. */ - public void set(Action a) {action = a;} + public void set(Action a) {actionList.add(a);} /** Run the action if it exists. */ - public void run(T obj) throws IOException { - if (action != null) { + public void run(T obj) throws E { + for (Action action : actionList) { action.run(obj); } } @@ -147,13 +147,14 @@ } /** An action is fired if all the constraints are satisfied. */ - public static class ConstraintSatisfactionAction implements Action { - private final Action action; + public static class ConstraintSatisfactionAction + implements Action { + private final Action action; private final Constraint[] constraints; /** Constructor */ public ConstraintSatisfactionAction( - Action action, Constraint... constraints) { + Action action, Constraint... constraints) { this.action = action; this.constraints = constraints; } @@ -163,7 +164,7 @@ * Short-circuit-and is used. */ @Override - public final void run(T parameter) throws IOException { + public final void run(T parameter) throws E { for(Constraint c : constraints) { if (!c.isSatisfied()) { return; Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj?rev=893066&r1=893065&r2=893066&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj (original) +++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj Tue Dec 22 04:38:16 2009 @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hdfs; -import java.io.IOException; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fi.DataTransferTestUtil; @@ -49,14 +47,10 @@ after(DataStreamer datastreamer) returning : pipelineInitNonAppend(datastreamer) { LOG.info("FI: after pipelineInitNonAppend: hasError=" + datastreamer.hasError + " errorIndex=" + datastreamer.errorIndex); - try { - if (datastreamer.hasError) { - DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest(); - if (dtTest != null ) - dtTest.fiPipelineInitErrorNonAppend.run(datastreamer.errorIndex); - } - } catch (IOException e) { - throw new RuntimeException(e); + if (datastreamer.hasError) { + DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest(); + if (dtTest != null) + dtTest.fiPipelineInitErrorNonAppend.run(datastreamer.errorIndex); } } @@ -78,13 +72,9 @@ before(DataStreamer datastreamer) : pipelineErrorAfterInit(datastreamer) { LOG.info("FI: before pipelineErrorAfterInit: errorIndex=" + datastreamer.errorIndex); - try { - DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest(); - if (dtTest != null ) - dtTest.fiPipelineErrorAfterInit.run(datastreamer.errorIndex); - } catch (IOException e) { - throw new RuntimeException(e); - } + DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest(); + if (dtTest != null ) + dtTest.fiPipelineErrorAfterInit.run(datastreamer.errorIndex); } pointcut pipelineClose(DFSOutputStream out): Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/PipelinesTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/PipelinesTestUtil.java?rev=893066&r1=893065&r2=893066&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/PipelinesTestUtil.java (original) +++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/PipelinesTestUtil.java Tue Dec 22 04:38:16 2009 @@ -39,7 +39,7 @@ /** * Storing acknowleged bytes num. action for fault injection tests */ - public static class ReceivedCheckAction implements FiTestUtil.Action { + public static class ReceivedCheckAction implements FiTestUtil.Action { String name; LinkedList rcv = ((PipelinesTest) getPipelineTest()).received; LinkedList ack = ((PipelinesTest) getPipelineTest()).acked; @@ -77,7 +77,7 @@ /** * Storing acknowleged bytes num. action for fault injection tests */ - public static class AckedCheckAction implements FiTestUtil.Action { + public static class AckedCheckAction implements FiTestUtil.Action { String name; LinkedList rcv = ((PipelinesTest) getPipelineTest()).received; LinkedList ack = ((PipelinesTest) getPipelineTest()).acked; @@ -118,10 +118,10 @@ LinkedList received = new LinkedList(); LinkedList acked = new LinkedList(); - public final ActionContainer fiCallSetNumBytes = - new ActionContainer(); - public final ActionContainer fiCallSetBytesAcked = - new ActionContainer(); + public final ActionContainer fiCallSetNumBytes = + new ActionContainer(); + public final ActionContainer fiCallSetBytesAcked = + new ActionContainer(); private static boolean suspend = false; private static long lastQueuedPacket = -1; Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj?rev=893066&r1=893065&r2=893066&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj (original) +++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj Tue Dec 22 04:38:16 2009 @@ -25,6 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fi.DataTransferTestUtil; +import org.apache.hadoop.fi.Pipeline; import org.apache.hadoop.fi.PipelineTest; import org.apache.hadoop.fi.ProbabilityModel; import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest; @@ -44,12 +45,7 @@ public static final Log LOG = LogFactory.getLog(BlockReceiverAspects.class); pointcut callReceivePacket(BlockReceiver blockreceiver) : - call (* OutputStream.write(..)) - && withincode (* BlockReceiver.receivePacket(..)) -// to further limit the application of this aspect a very narrow 'target' can be used as follows -// && target(DataOutputStream) - && !within(BlockReceiverAspects +) - && this(blockreceiver); + call(* receivePacket(..)) && target(blockreceiver); before(BlockReceiver blockreceiver ) throws IOException : callReceivePacket(blockreceiver) { @@ -67,7 +63,30 @@ } } - // Pointcuts and advises for TestFiPipelines + pointcut callWritePacketToDisk(BlockReceiver blockreceiver) : + call(* writePacketToDisk(..)) && target(blockreceiver); + + before(BlockReceiver blockreceiver + ) throws IOException : callWritePacketToDisk(blockreceiver) { + LOG.info("FI: callWritePacketToDisk"); + DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest(); + if (dtTest != null) + dtTest.fiCallWritePacketToDisk.run( + blockreceiver.getDataNode().getDatanodeRegistration()); + } + + pointcut afterDownstreamStatusRead(BlockReceiver.PacketResponder responder): + call(void PipelineAck.readFields(DataInput)) && this(responder); + + after(BlockReceiver.PacketResponder responder) + throws IOException: afterDownstreamStatusRead(responder) { + final DataNode d = responder.receiver.getDataNode(); + DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest(); + if (dtTest != null) + dtTest.fiAfterDownstreamStatusRead.run(d.getDatanodeRegistration()); + } + + // Pointcuts and advises for TestFiPipelines pointcut callSetNumBytes(BlockReceiver br, long offset) : call (void ReplicaInPipelineInterface.setNumBytes(long)) && withincode (int BlockReceiver.receivePacket(long, long, boolean, int, int)) Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java?rev=893066&r1=893065&r2=893066&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java (original) +++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java Tue Dec 22 04:38:16 2009 @@ -19,6 +19,7 @@ import java.io.IOException; +import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fi.DataTransferTestUtil; import org.apache.hadoop.fi.FiTestUtil; @@ -101,7 +102,7 @@ } private static void runReceiverOpWriteBlockTest(String methodName, - int errorIndex, Action a) throws IOException { + int errorIndex, Action a) throws IOException { FiTestUtil.LOG.info("Running " + methodName + " ..."); final DataTransferTest t = (DataTransferTest) DataTransferTestUtil .initTest(); @@ -113,7 +114,7 @@ } private static void runStatusReadTest(String methodName, int errorIndex, - Action a) throws IOException { + Action a) throws IOException { FiTestUtil.LOG.info("Running " + methodName + " ..."); final DataTransferTest t = (DataTransferTest) DataTransferTestUtil .initTest(); @@ -124,11 +125,11 @@ Assert.assertTrue(t.isSuccess()); } - private static void runCallReceivePacketTest(String methodName, - int errorIndex, Action a) throws IOException { + private static void runCallWritePacketToDisk(String methodName, + int errorIndex, Action a) throws IOException { FiTestUtil.LOG.info("Running " + methodName + " ..."); final DataTransferTest t = (DataTransferTest)DataTransferTestUtil.initTest(); - t.fiCallReceivePacket.set(a); + t.fiCallWritePacketToDisk.set(a); t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, errorIndex)); write1byte(methodName); Assert.assertTrue(t.isSuccess()); @@ -280,7 +281,7 @@ @Test public void pipeline_Fi_14() throws IOException { final String methodName = FiTestUtil.getMethodName(); - runCallReceivePacketTest(methodName, 0, new DoosAction(methodName, 0)); + runCallWritePacketToDisk(methodName, 0, new DoosAction(methodName, 0)); } /** @@ -291,7 +292,7 @@ @Test public void pipeline_Fi_15() throws IOException { final String methodName = FiTestUtil.getMethodName(); - runCallReceivePacketTest(methodName, 1, new DoosAction(methodName, 1)); + runCallWritePacketToDisk(methodName, 1, new DoosAction(methodName, 1)); } /** @@ -302,11 +303,11 @@ @Test public void pipeline_Fi_16() throws IOException { final String methodName = FiTestUtil.getMethodName(); - runCallReceivePacketTest(methodName, 2, new DoosAction(methodName, 2)); + runCallWritePacketToDisk(methodName, 2, new DoosAction(methodName, 2)); } private static void runPipelineCloseTest(String methodName, - Action a) throws IOException { + Action a) throws IOException { FiTestUtil.LOG.info("Running " + methodName + " ..."); final DataTransferTest t = (DataTransferTest) DataTransferTestUtil .initTest(); @@ -324,7 +325,7 @@ final DataTransferTest t = (DataTransferTest)DataTransferTestUtil.initTest(); final MarkerConstraint marker = new MarkerConstraint(name); t.fiPipelineClose.set(new DatanodeMarkingAction(name, i, marker)); - t.fiPipelineAck.set(new ConstraintSatisfactionAction(a, marker)); + t.fiPipelineAck.set(new ConstraintSatisfactionAction(a, marker)); write1byte(name); } @@ -442,7 +443,7 @@ } private static void runBlockFileCloseTest(String methodName, - Action a) throws IOException { + Action a) throws IOException { FiTestUtil.LOG.info("Running " + methodName + " ..."); final DataTransferTest t = (DataTransferTest) DataTransferTestUtil .initTest(); Added: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java?rev=893066&view=auto ============================================================================== --- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java (added) +++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java Tue Dec 22 04:38:16 2009 @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import java.io.IOException; +import java.util.Random; + +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fi.DataTransferTestUtil; +import org.apache.hadoop.fi.FiTestUtil; +import org.apache.hadoop.fi.DataTransferTestUtil.CountdownDoosAction; +import org.apache.hadoop.fi.DataTransferTestUtil.CountdownOomAction; +import org.apache.hadoop.fi.DataTransferTestUtil.CountdownSleepAction; +import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest; +import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction; +import org.apache.hadoop.fi.DataTransferTestUtil.VerificationAction; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.datanode.BlockReceiver; +import org.apache.log4j.Level; + +import org.junit.Assert; +import org.junit.Test; + +/** Test DataTransferProtocol with fault injection. */ +public class TestFiDataTransferProtocol2 { + static final short REPLICATION = 3; + static final long BLOCKSIZE = 1L * (1L << 20); + static final int PACKET_SIZE = 1024; + static final int MIN_N_PACKET = 3; + static final int MAX_N_PACKET = 10; + + static final Configuration conf = new Configuration(); + static { + conf.setInt("dfs.datanode.handler.count", 1); + conf.setInt("dfs.replication", REPLICATION); + conf.setInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, PACKET_SIZE); + conf.setInt("dfs.socket.timeout", 5000); + } + + static final byte[] bytes = new byte[MAX_N_PACKET * PACKET_SIZE]; + static final byte[] toRead = new byte[MAX_N_PACKET * PACKET_SIZE]; + + static private FSDataOutputStream createFile(FileSystem fs, Path p + ) throws IOException { + return fs.create(p, true, fs.getConf().getInt("io.file.buffer.size", 4096), + REPLICATION, BLOCKSIZE); + } + + { + ((Log4JLogger) BlockReceiver.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL); + } + /** + * 1. create files with dfs + * 2. write MIN_N_PACKET to MAX_N_PACKET packets + * 3. close file + * 4. open the same file + * 5. read the bytes and compare results + */ + private static void writeSeveralPackets(String methodName) throws IOException { + final Random r = FiTestUtil.RANDOM.get(); + final int nPackets = FiTestUtil.nextRandomInt(MIN_N_PACKET, MAX_N_PACKET + 1); + final int lastPacketSize = FiTestUtil.nextRandomInt(1, PACKET_SIZE + 1); + final int size = (nPackets - 1)*PACKET_SIZE + lastPacketSize; + + FiTestUtil.LOG.info("size=" + size + ", nPackets=" + nPackets + + ", lastPacketSize=" + lastPacketSize); + + final MiniDFSCluster cluster = new MiniDFSCluster(conf, REPLICATION, true, null); + final FileSystem dfs = cluster.getFileSystem(); + try { + final Path p = new Path("/" + methodName + "/foo"); + final FSDataOutputStream out = createFile(dfs, p); + + final long seed = r.nextLong(); + final Random ran = new Random(seed); + ran.nextBytes(bytes); + out.write(bytes, 0, size); + out.close(); + + final FSDataInputStream in = dfs.open(p); + int totalRead = 0; + int nRead = 0; + while ((nRead = in.read(toRead, totalRead, size - totalRead)) > 0) { + totalRead += nRead; + } + Assert.assertEquals("Cannot read file.", size, totalRead); + for (int i = 0; i < size; i++) { + Assert.assertTrue("File content differ.", bytes[i] == toRead[i]); + } + } + finally { + dfs.close(); + cluster.shutdown(); + } + } + + private static void initSlowDatanodeTest(DataTransferTest t, SleepAction a) + throws IOException { + t.fiCallReceivePacket.set(a); + t.fiReceiverOpWriteBlock.set(a); + t.fiStatusRead.set(a); + } + + private void runTest17_19(String methodName, int dnIndex) + throws IOException { + FiTestUtil.LOG.info("Running " + methodName + " ..."); + final int maxSleep = 3000; + final DataTransferTest t = (DataTransferTest) DataTransferTestUtil + .initTest(); + initSlowDatanodeTest(t, new SleepAction(methodName, 0, 0, maxSleep)); + initSlowDatanodeTest(t, new SleepAction(methodName, 1, 0, maxSleep)); + initSlowDatanodeTest(t, new SleepAction(methodName, 2, 0, maxSleep)); + t.fiCallWritePacketToDisk.set(new CountdownDoosAction(methodName, dnIndex, 3)); + t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, dnIndex)); + writeSeveralPackets(methodName); + Assert.assertTrue(t.isSuccess()); + } + + private void runTest29_30(String methodName, int dnIndex) throws IOException { + FiTestUtil.LOG.info("Running " + methodName + " ..."); + final int maxSleep = 3000; + final DataTransferTest t = (DataTransferTest) DataTransferTestUtil + .initTest(); + initSlowDatanodeTest(t, new SleepAction(methodName, 0, 0, maxSleep)); + initSlowDatanodeTest(t, new SleepAction(methodName, 1, 0, maxSleep)); + initSlowDatanodeTest(t, new SleepAction(methodName, 2, 0, maxSleep)); + t.fiAfterDownstreamStatusRead.set(new CountdownOomAction(methodName, dnIndex, 3)); + t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, dnIndex)); + writeSeveralPackets(methodName); + Assert.assertTrue(t.isSuccess()); + } + + private void runTest34_35(String methodName, int dnIndex) throws IOException { + FiTestUtil.LOG.info("Running " + methodName + " ..."); + final DataTransferTest t = (DataTransferTest) DataTransferTestUtil + .initTest(); + t.fiAfterDownstreamStatusRead.set(new CountdownSleepAction(methodName, dnIndex, 0, 3)); + t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, dnIndex)); + writeSeveralPackets(methodName); + Assert.assertTrue(t.isSuccess()); + } + /** + * Streaming: + * Randomize datanode speed, write several packets, + * DN0 throws a DiskOutOfSpaceError when it writes the third packet to disk. + * Client gets an IOException and determines DN0 bad. + */ + @Test + public void pipeline_Fi_17() throws IOException { + final String methodName = FiTestUtil.getMethodName(); + runTest17_19(methodName, 0); + } + + /** + * Streaming: + * Randomize datanode speed, write several packets, + * DN1 throws a DiskOutOfSpaceError when it writes the third packet to disk. + * Client gets an IOException and determines DN1 bad. + */ + @Test + public void pipeline_Fi_18() throws IOException { + final String methodName = FiTestUtil.getMethodName(); + runTest17_19(methodName, 1); + } + + /** + * Streaming: + * Randomize datanode speed, write several packets, + * DN2 throws a DiskOutOfSpaceError when it writes the third packet to disk. + * Client gets an IOException and determines DN2 bad. + */ + @Test + public void pipeline_Fi_19() throws IOException { + final String methodName = FiTestUtil.getMethodName(); + runTest17_19(methodName, 2); + } + + /** + * Streaming: Client writes several packets with DN0 very slow. Client + * finishes write successfully. + */ + @Test + public void pipeline_Fi_20() throws IOException { + final String methodName = FiTestUtil.getMethodName(); + FiTestUtil.LOG.info("Running " + methodName + " ..."); + final DataTransferTest t = (DataTransferTest) DataTransferTestUtil + .initTest(); + initSlowDatanodeTest(t, new SleepAction(methodName, 0, 3000)); + writeSeveralPackets(methodName); + } + + /** + * Streaming: Client writes several packets with DN1 very slow. Client + * finishes write successfully. + */ + @Test + public void pipeline_Fi_21() throws IOException { + final String methodName = FiTestUtil.getMethodName(); + FiTestUtil.LOG.info("Running " + methodName + " ..."); + final DataTransferTest t = (DataTransferTest) DataTransferTestUtil + .initTest(); + initSlowDatanodeTest(t, new SleepAction(methodName, 1, 3000)); + writeSeveralPackets(methodName); + } + + /** + * Streaming: Client writes several packets with DN2 very slow. Client + * finishes write successfully. + */ + @Test + public void pipeline_Fi_22() throws IOException { + final String methodName = FiTestUtil.getMethodName(); + FiTestUtil.LOG.info("Running " + methodName + " ..."); + final DataTransferTest t = (DataTransferTest) DataTransferTestUtil + .initTest(); + initSlowDatanodeTest(t, new SleepAction(methodName, 2, 3000)); + writeSeveralPackets(methodName); + } + + /** + * Streaming: Randomize datanode speed, write several packets, DN1 throws a + * OutOfMemoryException when it receives the ack of the third packet from DN2. + * Client gets an IOException and determines DN1 bad. + */ + @Test + public void pipeline_Fi_29() throws IOException { + final String methodName = FiTestUtil.getMethodName(); + runTest29_30(methodName, 1); + } + + /** + * Streaming: Randomize datanode speed, write several packets, DN0 throws a + * OutOfMemoryException when it receives the ack of the third packet from DN1. + * Client gets an IOException and determines DN0 bad. + */ + @Test + public void pipeline_Fi_30() throws IOException { + final String methodName = FiTestUtil.getMethodName(); + runTest29_30(methodName, 0); + } + + /** + * Streaming: Write several packets, DN1 never responses when it receives the + * ack of the third packet from DN2. Client gets an IOException and determines + * DN1 bad. + */ + @Test + public void pipeline_Fi_34() throws IOException { + final String methodName = FiTestUtil.getMethodName(); + runTest34_35(methodName, 1); + } + + /** + * Streaming: Write several packets, DN0 never responses when it receives the + * ack of the third packet from DN1. Client gets an IOException and determines + * DN0 bad. + */ + @Test + public void pipeline_Fi_35() throws IOException { + final String methodName = FiTestUtil.getMethodName(); + runTest34_35(methodName, 0); + } +} \ No newline at end of file Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java?rev=893066&r1=893065&r2=893066&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java (original) +++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java Tue Dec 22 04:38:16 2009 @@ -83,7 +83,7 @@ } private static void runPipelineCloseTest(String methodName, - Action a) throws IOException { + Action a) throws IOException { FiTestUtil.LOG.info("Running " + methodName + " ..."); final DataTransferTest t = (DataTransferTest) DataTransferTestUtil .initTest();