hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From c..@apache.org
Subject svn commit: r832043 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/aop/org/apache/hadoop/hdfs/ src/test/aop/org/apache/hadoop/hdfs/server/datanode/ src/test/hdfs/org/apache/hadoop/hdfs/
Date Mon, 02 Nov 2009 18:51:02 GMT
Author: cos
Date: Mon Nov  2 18:51:02 2009
New Revision: 832043

URL: http://svn.apache.org/viewvc?rev=832043&view=rev
Log:
HDFS-521. Create new tests for pipeline. Contributed by Konstantin Boudnik

Added:
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/PipelinesTestUtil.java
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/TestFiPipelines.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestPipelines.java
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=832043&r1=832042&r2=832043&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Mon Nov  2 18:51:02 2009
@@ -330,6 +330,8 @@
     HADOOP-5107. Use Maven ant tasks to publish the subproject jars.
     (Giridharan Kesavan via omalley)
 
+    HDFS-521. Create new tests for pipeline (cos)
+
   BUG FIXES
 
     HDFS-76. Better error message to users when commands fail because of 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=832043&r1=832042&r2=832043&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Mon
Nov  2 18:51:02 2009
@@ -843,8 +843,7 @@
               lastPacket = true;
             }
 
-            replyOut.writeLong(expected);
-            SUCCESS.write(replyOut);
+            ackReply(expected);
             replyOut.flush();
             // remove the packet from the ack queue
             removeAckHead();
@@ -871,6 +870,14 @@
                " for block " + block + " terminating");
     }
 
+    // This method is introduced to facilitate testing. Otherwise
+    // there was a little chance to bind an AspectJ advice to such a sequence
+    // of calls
+    private void ackReply(long expected) throws IOException {
+      replyOut.writeLong(expected);
+      SUCCESS.write(replyOut);
+    }
+
     /**
      * Thread to process incoming acks.
      * @see java.lang.Runnable#run()
@@ -984,8 +991,7 @@
             }
 
             // send my status back to upstream datanode
-            replyOut.writeLong(expected); // send seqno upstream
-            SUCCESS.write(replyOut);
+            ackReply(expected);
 
             LOG.debug("PacketResponder " + numTargets + 
                       " for block " + block +

Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj?rev=832043&r1=832042&r2=832043&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj Mon Nov  2 18:51:02
2009
@@ -22,13 +22,15 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.fi.PipelineTest;
 import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
 import org.apache.hadoop.hdfs.DFSClient.DFSOutputStream;
 import org.apache.hadoop.hdfs.DFSClient.DFSOutputStream.DataStreamer;
+import org.apache.hadoop.hdfs.PipelinesTestUtil.PipelinesTest;
 import org.junit.Assert;
 
 /** Aspects for DFSClient */
-public aspect DFSClientAspects {
+privileged public aspect DFSClientAspects {
   public static final Log LOG = LogFactory.getLog(DFSClientAspects.class);
 
   pointcut callCreateBlockOutputStream(DataStreamer datastreamer):
@@ -93,4 +95,19 @@
   before(DFSOutputStream out) : pipelineClose(out) {
     LOG.info("FI: before pipelineClose:");
   }
+
+  pointcut checkAckQueue(DFSClient.DFSOutputStream.Packet cp):
+    call (void DFSClient.DFSOutputStream.waitAndQueuePacket(
+            DFSClient.DFSOutputStream.Packet))
+    && withincode (void DFSClient.DFSOutputStream.writeChunk(..))
+    && args(cp);
+
+  after(DFSClient.DFSOutputStream.Packet cp) : checkAckQueue (cp) {
+    PipelineTest pTest = DataTransferTestUtil.getDataTransferTest();
+    if (pTest != null && pTest instanceof PipelinesTest) {
+      LOG.debug("FI: Recording packet # " + cp.seqno
+          + " where queuing has occurred");
+      ((PipelinesTest) pTest).setVerified(cp.seqno);
+    }
+  }
 }

Added: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/PipelinesTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/PipelinesTestUtil.java?rev=832043&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/PipelinesTestUtil.java (added)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/PipelinesTestUtil.java Mon Nov 
2 18:51:02 2009
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.LinkedList;
+
+import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.fi.FiTestUtil;
+import org.apache.hadoop.fi.PipelineTest;
+import org.apache.hadoop.fi.FiTestUtil.ActionContainer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+
+public class PipelinesTestUtil extends DataTransferTestUtil {
+  /**
+   * {@inheritDoc}
+   */
+  public static PipelineTest initTest() {
+    return thepipelinetest = new PipelinesTest();
+  }
+
+  /**
+   * Storing acknowleged bytes num. action for fault injection tests
+   */
+  public static class ReceivedCheckAction implements FiTestUtil.Action<NodeBytes> {
+    String name;
+    LinkedList<NodeBytes> rcv = ((PipelinesTest) getPipelineTest()).received;
+    LinkedList<NodeBytes> ack = ((PipelinesTest) getPipelineTest()).acked;
+
+    /**
+     * @param name of the test
+     */
+   public ReceivedCheckAction(String name) {
+     this.name = name;
+   }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void run(NodeBytes nb) throws IOException {
+      synchronized (rcv) {
+        rcv.add(nb);
+        for (NodeBytes n : rcv) {
+          long counterPartsBytes = -1;
+          NodeBytes counterPart = null;
+          if (ack.size() > rcv.indexOf(n)) {
+            counterPart = ack.get(rcv.indexOf(n));
+            counterPartsBytes = counterPart.bytes;
+          }
+          assertTrue("FI: Wrong receiving length",
+              counterPartsBytes <= n.bytes);
+          FiTestUtil.LOG.debug("FI: before compare of Recv bytes. Expected " +
+              n.bytes + ", got " + counterPartsBytes);
+        }
+      }
+    }
+  }
+
+  /**
+   * Storing acknowleged bytes num. action for fault injection tests
+   */
+  public static class AckedCheckAction implements FiTestUtil.Action<NodeBytes> {
+    String name;
+    LinkedList<NodeBytes> rcv = ((PipelinesTest) getPipelineTest()).received;
+    LinkedList<NodeBytes> ack = ((PipelinesTest) getPipelineTest()).acked;
+
+    /**
+     * @param name of the test
+     */
+    public AckedCheckAction(String name) {
+      this.name = name;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void run(NodeBytes nb) throws IOException {
+      synchronized (ack) {
+        ack.add(nb);
+        for (NodeBytes n : ack) {
+          NodeBytes counterPart = null;
+          long counterPartsBytes = -1;
+          if (rcv.size() > ack.indexOf(n)) { 
+            counterPart = rcv.get(ack.indexOf(n));
+            counterPartsBytes = counterPart.bytes;
+          }
+          assertTrue("FI: Wrong acknowledged length",
+              counterPartsBytes == n.bytes);
+          FiTestUtil.LOG.debug("FI: before compare of Acked bytes. Expected " +
+              n.bytes + ", got " + counterPartsBytes);
+        }
+      }
+    }
+  }
+
+  /**
+   * Class adds new types of action
+   */
+  public static class PipelinesTest extends DataTransferTest {
+    LinkedList<NodeBytes> received = new LinkedList<NodeBytes>();
+    LinkedList<NodeBytes> acked = new LinkedList<NodeBytes>();
+
+    public final ActionContainer<NodeBytes> fiCallSetNumBytes =
+      new ActionContainer<NodeBytes>();
+    public final ActionContainer<NodeBytes> fiCallSetBytesAcked =
+      new ActionContainer<NodeBytes>();
+    
+    private static boolean suspend = false;
+    private static long lastQueuedPacket = -1;
+    
+    public void setSuspend(boolean flag) {
+      suspend = flag;
+    }
+    public boolean getSuspend () {
+      return suspend;
+    }
+    public void setVerified(long packetNum) {
+      PipelinesTest.lastQueuedPacket = packetNum;
+    }
+    public long getLastQueued() {
+      return lastQueuedPacket;
+    }
+  }
+
+  public static class NodeBytes {
+    DatanodeID id;
+    long bytes;
+    public NodeBytes(DatanodeID id, long bytes) {
+      this.id = id;
+      this.bytes = bytes;
+    }
+  }
+}

Added: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/TestFiPipelines.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/TestFiPipelines.java?rev=832043&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/TestFiPipelines.java (added)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/TestFiPipelines.java Mon Nov  2
18:51:02 2009
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fi.FiTestUtil;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.datanode.BlockReceiverAspects;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestFiPipelines {
+  public static final Log LOG = LogFactory.getLog(TestFiPipelines.class);
+
+  private static short REPL_FACTOR = 3;
+  private static final int RAND_LIMIT = 2000;
+
+  private MiniDFSCluster cluster;
+  private DistributedFileSystem fs;
+  private static Configuration conf;
+  Random rand = new Random(RAND_LIMIT);
+
+  static {
+    initLoggers();
+    setConfiguration();
+  }
+
+  @Before
+  public void startUpCluster() throws IOException {
+    cluster = new MiniDFSCluster(conf, REPL_FACTOR, true, null);
+    fs = (DistributedFileSystem) cluster.getFileSystem();
+  }
+
+  @After
+  public void shutDownCluster() throws IOException {
+    if (fs != null)
+      fs.close();
+    if (cluster != null)
+      cluster.shutdown();
+  }
+  
+  /**
+   * Test initiates and sets actions created by injection framework. The actions
+   * work with both aspects of sending acknologment packets in a pipeline.
+   * Creates and closes a file of certain length < packet size.
+   * Injected actions will check if number of visible bytes at datanodes equals
+   * to number of acknoleged bytes
+   *
+   * @throws IOException in case of an error
+   */
+  @Test
+  public void pipeline_04() throws IOException {
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    LOG.debug("Running " + METHOD_NAME);
+
+    final PipelinesTestUtil.PipelinesTest pipst =
+      (PipelinesTestUtil.PipelinesTest) PipelinesTestUtil.initTest();
+
+    pipst.fiCallSetNumBytes.set(new PipelinesTestUtil.ReceivedCheckAction(METHOD_NAME));
+    pipst.fiCallSetBytesAcked.set(new PipelinesTestUtil.AckedCheckAction(METHOD_NAME));
+
+    Path filePath = new Path("/" + METHOD_NAME + ".dat");
+    FSDataOutputStream fsOut = fs.create(filePath);
+    TestPipelines.writeData(fsOut, 2);
+    fs.close();
+  }
+
+  /**
+   * Similar to pipeline_04 but sends many packets into a pipeline 
+   * @throws IOException in case of an error
+   */
+  @Test
+  public void pipeline_05() throws IOException {
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    LOG.debug("Running " + METHOD_NAME);
+
+    final PipelinesTestUtil.PipelinesTest pipst =
+      (PipelinesTestUtil.PipelinesTest) PipelinesTestUtil.initTest();
+
+    pipst.fiCallSetNumBytes.set(new PipelinesTestUtil.ReceivedCheckAction(METHOD_NAME));
+    pipst.fiCallSetBytesAcked.set(new PipelinesTestUtil.AckedCheckAction(METHOD_NAME));
+
+    Path filePath = new Path("/" + METHOD_NAME + ".dat");
+    FSDataOutputStream fsOut = fs.create(filePath);
+    for (int i = 0; i < 17; i++) {
+      TestPipelines.writeData(fsOut, 23);
+    }
+    fs.close();
+  } 
+
+  /**
+   * This quite tricky test prevents acknowledgement packets from a datanode
+   * This should block any write attempts after ackQueue is full.
+   * Test is blocking, so the MiniDFSCluster has to be killed harshly.
+   * @throws IOException in case of an error
+   */
+  @Test
+  public void pipeline_06() throws IOException {
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    final int MAX_PACKETS = 80;
+    
+    LOG.debug("Running " + METHOD_NAME);
+
+    final PipelinesTestUtil.PipelinesTest pipst =
+      (PipelinesTestUtil.PipelinesTest) PipelinesTestUtil.initTest();
+
+    pipst.setSuspend(true); // This is ack. suspend test
+    Path filePath = new Path("/" + METHOD_NAME + ".dat");
+    FSDataOutputStream fsOut = fs.create(filePath);
+
+    int cnt = 0;
+    try {
+      // At this point let's start an external checker thread, which will
+      // verify the test's results and shutdown the MiniDFSCluster for us,
+      // because what it's gonna do has BLOCKING effect on datanodes 
+      QueueChecker cq = new QueueChecker(pipst, MAX_PACKETS);
+      cq.start();
+      // The following value is explained by the fact that size of a packet isn't
+      // necessary equals to the value of
+      // DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY
+      // The actual logic is expressed in DFSClient#computePacketChunkSize
+      int bytesToSend = 700;
+      while (cnt < 100 && pipst.getSuspend()) {
+        LOG.debug("_06(): " + cnt++ + " sending another " + bytesToSend + " bytes");
+        TestPipelines.writeData(fsOut, bytesToSend);
+      }
+    } catch (Exception e) {
+      LOG.warn("Getting unexpected exception: ", e);
+    }
+    LOG.debug("Last queued packet number " + pipst.getLastQueued());
+    assertTrue("Shouldn't be able to send more than 81 packet", pipst.getLastQueued() <=
81);
+  }
+
+  private class QueueChecker extends Thread {
+    PipelinesTestUtil.PipelinesTest test;
+    final int MAX;
+    boolean done = false;
+    
+    public QueueChecker(PipelinesTestUtil.PipelinesTest handle, int maxPackets) {
+      test = handle;
+      MAX = maxPackets;
+    }
+
+    @Override
+    public void run() {
+      while (!done) {
+        LOG.debug("_06: checking for the limit " + test.getLastQueued() + 
+            " and " + MAX);
+        if (test.getLastQueued() >= MAX) {
+          LOG.debug("FI: Resume packets acking");
+          test.setSuspend(false); //Do not suspend ack sending any more
+          done = true;
+        }
+        if (!done)
+          try {
+            LOG.debug("_06: MAX isn't reached yet. Current=" + test.getLastQueued());
+            sleep(100);
+          } catch (InterruptedException e) { }
+      }
+
+      assertTrue("Shouldn't be able to send more than 81 packet", test.getLastQueued() <=
81);
+      try {
+        LOG.debug("_06: shutting down the cluster");
+        // It has to be done like that, because local version of shutDownCluster()
+        // won't work, because it tries to close an instance of FileSystem too.
+        // Which is where the waiting is happening.
+        if (cluster !=null ) cluster.shutdown();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+      LOG.debug("End QueueChecker thread");
+    }
+  }
+  
+  private static void setConfiguration() {
+    conf = new Configuration();
+    int customPerChecksumSize = 700;
+    int customBlockSize = customPerChecksumSize * 3;
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 100);
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
+    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, customBlockSize / 2);
+    conf.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 0);
+  }
+
+  private static void initLoggers() {
+    ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) DataNode.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) TestFiPipelines.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) FiTestUtil.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) BlockReceiverAspects.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) DFSClientAspects.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+}

Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj?rev=832043&r1=832042&r2=832043&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
(original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
Mon Nov  2 18:51:02 2009
@@ -24,8 +24,12 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.fi.PipelineTest;
 import org.apache.hadoop.fi.ProbabilityModel;
 import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
+import org.apache.hadoop.hdfs.server.datanode.BlockReceiver.PacketResponder;
+import org.apache.hadoop.hdfs.PipelinesTestUtil.PipelinesTest;
+import org.apache.hadoop.hdfs.PipelinesTestUtil.NodeBytes;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@@ -34,7 +38,7 @@
  * This aspect takes care about faults injected into datanode.BlockReceiver 
  * class 
  */
-public privileged aspect BlockReceiverAspects {
+privileged public aspect BlockReceiverAspects {
   public static final Log LOG = LogFactory.getLog(BlockReceiverAspects.class);
 
   pointcut callReceivePacket(BlockReceiver blockreceiver) :
@@ -60,6 +64,107 @@
         thisJoinPoint.getStaticPart( ).getSourceLocation());
     }
   }
+  
+  // Pointcuts and advises for TestFiPipelines  
+  pointcut callSetNumBytes(BlockReceiver br, long offset) : 
+    call (void ReplicaInPipelineInterface.setNumBytes(long)) 
+    && withincode (int BlockReceiver.receivePacket(long, long, boolean, int, int))
+    && args(offset) 
+    && this(br);
+  
+  after(BlockReceiver br, long offset) : callSetNumBytes(br, offset) {
+    LOG.debug("FI: Received bytes To: " + br.datanode.dnRegistration.getStorageID() + ":
" + offset);
+    PipelineTest pTest = DataTransferTestUtil.getDataTransferTest();
+    if (pTest == null) {
+      LOG.debug("FI: no pipeline has been found in receiving");
+      return;
+    }
+    if (!(pTest instanceof PipelinesTest)) {
+      return;
+    }
+    NodeBytes nb = new NodeBytes(br.datanode.dnRegistration, offset);
+    try {
+      ((PipelinesTest)pTest).fiCallSetNumBytes.run(nb);
+    } catch (IOException e) {
+      LOG.fatal("FI: no exception is expected here!");
+    }
+  }
+  
+  // Pointcuts and advises for TestFiPipelines  
+  pointcut callSetBytesAcked(PacketResponder pr, long acked) : 
+    call (void ReplicaInPipelineInterface.setBytesAcked(long)) 
+    && withincode (void PacketResponder.run())
+    && args(acked) 
+    && this(pr);
+
+  pointcut callSetBytesAckedLastDN(PacketResponder pr, long acked) : 
+    call (void ReplicaInPipelineInterface.setBytesAcked(long)) 
+    && withincode (void PacketResponder.lastDataNodeRun())
+    && args(acked) 
+    && this(pr);
+  
+  after (PacketResponder pr, long acked) : callSetBytesAcked (pr, acked) {
+    PipelineTest pTest = DataTransferTestUtil.getDataTransferTest();
+    if (pTest == null) {
+      LOG.debug("FI: no pipeline has been found in acking");
+      return;
+    }
+    LOG.debug("FI: Acked total bytes from: " + 
+        pr.receiver.datanode.dnRegistration.getStorageID() + ": " + acked);
+    if (pTest instanceof PipelinesTest) {
+      bytesAckedService((PipelinesTest)pTest, pr, acked);
+    }
+  }
+  after (PacketResponder pr, long acked) : callSetBytesAckedLastDN (pr, acked) {
+    PipelineTest pTest = DataTransferTestUtil.getDataTransferTest();
+    if (pTest == null) {
+      LOG.debug("FI: no pipeline has been found in acking");
+      return;
+    }
+    LOG.debug("FI: Acked total bytes from (last DN): " + 
+        pr.receiver.datanode.dnRegistration.getStorageID() + ": " + acked);
+    if (pTest instanceof PipelinesTest) {
+      bytesAckedService((PipelinesTest)pTest, pr, acked); 
+    }
+  }
+  
+  private void bytesAckedService 
+      (final PipelinesTest pTest, final PacketResponder pr, final long acked) {
+    NodeBytes nb = new NodeBytes(pr.receiver.datanode.dnRegistration, acked);
+    try {
+      pTest.fiCallSetBytesAcked.run(nb);
+    } catch (IOException e) {
+      LOG.fatal("No exception should be happening at this point");
+      assert false;
+    }
+  }
+  
+  pointcut preventAckSending () :
+    call (void ackReply(long)) 
+    && within (PacketResponder);
+
+  static int ackCounter = 0;
+  void around () : preventAckSending () {
+    PipelineTest pTest = DataTransferTestUtil.getDataTransferTest();
+
+    if (pTest == null) { 
+      LOG.debug("FI: remove first ack as expected");
+      proceed();
+      return;
+    }
+    if (!(pTest instanceof PipelinesTest)) {
+      LOG.debug("FI: remove first ack as expected");
+      proceed();
+      return;
+    }
+    if (((PipelinesTest)pTest).getSuspend()) {
+        LOG.debug("FI: suspend the ack");
+        return;
+    }
+    LOG.debug("FI: remove first ack as expected");
+    proceed();
+  }
+  // End of pointcuts and advises for TestFiPipelines  
 
   pointcut pipelineClose(BlockReceiver blockreceiver, long offsetInBlock, long seqno,
       boolean lastPacketInBlock, int len, int endOfHeader) :

Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestPipelines.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestPipelines.java?rev=832043&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestPipelines.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestPipelines.java Mon Nov  2 18:51:02
2009
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
+import org.apache.hadoop.hdfs.server.datanode.Replica;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.After;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
+public class TestPipelines {
+  public static final Log LOG = LogFactory.getLog(TestPipelines.class);
+
+  private static short REPL_FACTOR = 3;
+  private static final int RAND_LIMIT = 2000;
+  private static final int FILE_SIZE = 10000;
+
+  private MiniDFSCluster cluster;
+  private DistributedFileSystem fs;
+  private static Configuration conf;
+  static Random rand = new Random(RAND_LIMIT);
+
+  static {
+    initLoggers();
+    setConfiguration();
+  }
+
+  @Before
+  public void startUpCluster() throws IOException {
+    cluster = new MiniDFSCluster(conf, REPL_FACTOR, true, null);
+    fs = (DistributedFileSystem) cluster.getFileSystem();
+  }
+
+  @After
+  public void shutDownCluster() throws IOException {
+    if (fs != null)
+      fs.close();
+    if (cluster != null) {
+      cluster.shutdownDataNodes();
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Creates and closes a file of certain length.
+   * Calls append to allow next write() operation to add to the end of it
+   * After write() invocation, calls hflush() to make sure that data sunk through
+   * the pipeline and check the state of the last block's replica.
+   * It supposes to be in RBW state
+   *
+   * @throws IOException in case of an error
+   */
+  @Test
+  public void pipeline_01() throws IOException {
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    LOG.debug("Running " + METHOD_NAME);
+    Path filePath = new Path("/" + METHOD_NAME + ".dat");
+
+    DFSTestUtil.createFile(fs, filePath, FILE_SIZE, REPL_FACTOR, rand.nextLong());
+    LOG.debug("Invoking append but doing nothing otherwise...");
+    FSDataOutputStream ofs = fs.append(filePath);
+    ofs.writeBytes("Some more stuff to write");
+    ((DFSClient.DFSOutputStream) ofs.getWrappedStream()).hflush();
+
+    List<LocatedBlock> lb = cluster.getNameNode().getBlockLocations(
+      filePath.toString(), FILE_SIZE - 1, FILE_SIZE).getLocatedBlocks();
+
+    Replica r = DataNodeAdapter.fetchReplicaInfo(cluster.getDataNodes().get(0),
+      lb.get(0).getBlock().getBlockId());
+    assertTrue("Replica shouldn'e be null", r != null);
+    assertEquals(
+      "Should be RBW replica after sequence of calls append()/write()/hflush()",
+      HdfsConstants.ReplicaState.RBW, r.getState());
+    ofs.close();
+  }
+
+  /**
+   * These two test cases are already implemented by
+   *
+   * @link{TestReadWhileWriting}
+   */
+  public void pipeline_02_03() {
+  }
+  
+  static byte[] writeData(final FSDataOutputStream out, final int length)
+    throws IOException {
+    int bytesToWrite = length;
+    byte[] ret = new byte[bytesToWrite];
+    byte[] toWrite = new byte[1024];
+    int written = 0;
+    Random rb = new Random(rand.nextLong());
+    while (bytesToWrite > 0) {
+      rb.nextBytes(toWrite);
+      int bytesToWriteNext = (1024 < bytesToWrite) ? 1024 : bytesToWrite;
+      out.write(toWrite, 0, bytesToWriteNext);
+      System.arraycopy(toWrite, 0, ret, (ret.length - bytesToWrite),
+        bytesToWriteNext);
+      written += bytesToWriteNext;
+      LOG.debug("Written: " + bytesToWriteNext + "; Total: " + written);
+      bytesToWrite -= bytesToWriteNext;
+    }
+    return ret;
+  }
+  
+  private static void setConfiguration() {
+    conf = new Configuration();
+    int customPerChecksumSize = 700;
+    int customBlockSize = customPerChecksumSize * 3;
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 100);
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
+    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, customBlockSize / 2);
+    conf.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 0);
+  }
+
+  private static void initLoggers() {
+    ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) DataNode.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
+  }
+}

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java?rev=832043&r1=832042&r2=832043&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java Mon Nov
 2 18:51:02 2009
@@ -17,10 +17,6 @@
  */
 package org.apache.hadoop.hdfs;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -28,13 +24,16 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.Level;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
 /** Test reading from hdfs while a file is being written. */
 public class TestReadWhileWriting {
   {
@@ -48,10 +47,10 @@
   
   /** Test reading while writing. */
   @Test
-  public void testReadWhileWriting() throws Exception {
+  public void pipeline_02_03() throws Exception {
     final Configuration conf = new HdfsConfiguration();
     //enable append
-    conf.setBoolean("dfs.support.append", true);
+    conf.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
 
     // create cluster
     final MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);



Mime
View raw message