hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1092432 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/aop/org/apache/hadoop/hdfs/server/datanode/
Date Thu, 14 Apr 2011 18:35:49 GMT
Author: szetszwo
Date: Thu Apr 14 18:35:49 2011
New Revision: 1092432

URL: http://svn.apache.org/viewvc?rev=1092432&view=rev
Log:
HDFS-1833. Reduce repeated string constructions and unnecessary fields, and fix comments in
BlockReceiver.PacketResponder.

Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1092432&r1=1092431&r2=1092432&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Thu Apr 14 18:35:49 2011
@@ -109,6 +109,9 @@ Trunk (unreleased changes)
     HDFS-1760. In FSDirectory.getFullPathName(..), it is better to return "/"
     for root directory instead of an empty string.  (Daryn Sharp via szetszwo)
 
+    HDFS-1833. Reduce repeated string constructions and unnecessary fields,
+    and fix comments in BlockReceiver.PacketResponder.  (szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1092432&r1=1092431&r2=1092432&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Thu
Apr 14 18:35:49 2011
@@ -22,6 +22,7 @@ import static org.apache.hadoop.hdfs.pro
 import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
 
 import java.io.BufferedOutputStream;
+import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
@@ -54,7 +55,7 @@ import org.apache.hadoop.util.StringUtil
  * may copies it to another site. If a throttler is provided,
  * streaming throttling is also supported.
  **/
-class BlockReceiver implements java.io.Closeable, FSConstants {
+class BlockReceiver implements Closeable, FSConstants {
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
@@ -646,7 +647,7 @@ class BlockReceiver implements java.io.C
       DataInputStream mirrIn,   // input from next datanode
       DataOutputStream replyOut,  // output to previous datanode
       String mirrAddr, DataTransferThrottler throttlerArg,
-      int numTargets) throws IOException {
+      DatanodeInfo[] downstreams) throws IOException {
 
       boolean responderClosed = false;
       mirrorOut = mirrOut;
@@ -656,10 +657,8 @@ class BlockReceiver implements java.io.C
     try {
       if (isClient && !isTransfer) {
         responder = new Daemon(datanode.threadGroup, 
-                               new PacketResponder(this, block, mirrIn, 
-                                                   replyOut, numTargets,
-                                                   Thread.currentThread()));
-        responder.start(); // start thread to processes reponses
+            new PacketResponder(replyOut, mirrIn, downstreams));
+        responder.start(); // start thread to processes responses
       }
 
       /* 
@@ -695,8 +694,7 @@ class BlockReceiver implements java.io.C
       }
 
     } catch (IOException ioe) {
-      LOG.info("Exception in receiveBlock for block " + block + 
-               " " + ioe);
+      LOG.info("Exception in receiveBlock for " + block, ioe);
       throw ioe;
     } finally {
       if (!responderClosed) { // Abnormal termination of the flow above
@@ -803,51 +801,71 @@ class BlockReceiver implements java.io.C
     }
   }
   
+  private static enum PacketResponderType {
+    NON_PIPELINE, LAST_IN_PIPELINE, HAS_DOWNSTREAM_IN_PIPELINE
+  }
   
   /**
    * Processed responses from downstream datanodes in the pipeline
    * and sends back replies to the originator.
    */
-  class PacketResponder implements Runnable, FSConstants {   
+  class PacketResponder implements Runnable, Closeable, FSConstants {   
 
-    //packet waiting for ack
-    private LinkedList<Packet> ackQueue = new LinkedList<Packet>(); 
+    /** queue for packets waiting for ack */
+    private final LinkedList<Packet> ackQueue = new LinkedList<Packet>(); 
+    /** the thread that spawns this responder */
+    private final Thread receiverThread = Thread.currentThread();
+    /** is this responder running? */
     private volatile boolean running = true;
-    private Block block;
-    DataInputStream mirrorIn;   // input from downstream datanode
-    DataOutputStream replyOut;  // output to upstream datanode
-    private int numTargets;     // number of downstream datanodes including myself
-    private BlockReceiver receiver; // The owner of this responder.
-    private Thread receiverThread; // the thread that spawns this responder
 
+    /** input from the next downstream datanode */
+    private final DataInputStream downstreamIn;
+    /** output to upstream datanode/client */
+    private final DataOutputStream upstreamOut;
+
+    /** The type of this responder */
+    private final PacketResponderType type;
+    /** for log and error messages */
+    private final String myString; 
+
+    @Override
     public String toString() {
-      return "PacketResponder " + numTargets + " for Block " + this.block;
+      return myString;
     }
 
-    PacketResponder(BlockReceiver receiver, Block b, DataInputStream in, 
-                    DataOutputStream out, int numTargets,
-                    Thread receiverThread) {
-      this.receiverThread = receiverThread;
-      this.receiver = receiver;
-      this.block = b;
-      mirrorIn = in;
-      replyOut = out;
-      this.numTargets = numTargets;
+    PacketResponder(final DataOutputStream upstreamOut,
+        final DataInputStream downstreamIn,
+        final DatanodeInfo[] downstreams) {
+      this.downstreamIn = downstreamIn;
+      this.upstreamOut = upstreamOut;
+
+      this.type = downstreams == null? PacketResponderType.NON_PIPELINE
+          : downstreams.length == 0? PacketResponderType.LAST_IN_PIPELINE
+              : PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE;
+
+      final StringBuilder b = new StringBuilder(getClass().getSimpleName())
+          .append(": ").append(block).append(", type=").append(type);
+      if (type != PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) {
+        b.append(", downstreams=").append(downstreams.length)
+            .append(":").append(Arrays.asList(downstreams));
+      }
+      this.myString = b.toString();
     }
 
     /**
      * enqueue the seqno that is still be to acked by the downstream datanode.
      * @param seqno
      * @param lastPacketInBlock
-     * @param lastByteInPacket
+     * @param offsetInBlock
      */
-    synchronized void enqueue(long seqno, boolean lastPacketInBlock, long lastByteInPacket)
{
+    synchronized void enqueue(final long seqno,
+        final boolean lastPacketInBlock, final long offsetInBlock) {
       if (running) {
+        final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock);
         if(LOG.isDebugEnabled()) {
-          LOG.debug("PacketResponder " + numTargets + " adding seqno " + seqno +
-                    " to ack queue.");
+          LOG.debug(myString + ": enqueue " + p);
         }
-        ackQueue.addLast(new Packet(seqno, lastPacketInBlock, lastByteInPacket));
+        ackQueue.addLast(p);
         notifyAll();
       }
     }
@@ -855,7 +873,8 @@ class BlockReceiver implements java.io.C
     /**
      * wait for all pending packets to be acked. Then shutdown thread.
      */
-    synchronized void close() {
+    @Override
+    public synchronized void close() {
       while (running && ackQueue.size() != 0 && datanode.shouldRun) {
         try {
           wait();
@@ -864,8 +883,7 @@ class BlockReceiver implements java.io.C
         }
       }
       if(LOG.isDebugEnabled()) {
-        LOG.debug("PacketResponder " + numTargets +
-                 " for block " + block + " Closing down.");
+        LOG.debug(myString + ": closing");
       }
       running = false;
       notifyAll();
@@ -887,21 +905,21 @@ class BlockReceiver implements java.io.C
             PipelineAck ack = new PipelineAck();
             long seqno = PipelineAck.UNKOWN_SEQNO;
             try {
-              if (numTargets != 0 && !mirrorError) {// not the last DN & no mirror
error
+              if (type != PacketResponderType.LAST_IN_PIPELINE
+                  && !mirrorError) {
                 // read an ack from downstream datanode
-                ack.readFields(mirrorIn);
+                ack.readFields(downstreamIn);
                 if (LOG.isDebugEnabled()) {
-                  LOG.debug("PacketResponder " + numTargets + " got " + ack);
+                  LOG.debug(myString + " got " + ack);
                 }
                 seqno = ack.getSeqno();
               }
-              if (seqno != PipelineAck.UNKOWN_SEQNO || numTargets == 0) {
+              if (seqno != PipelineAck.UNKOWN_SEQNO
+                  || type == PacketResponderType.LAST_IN_PIPELINE) {
                 synchronized (this) {
                   while (running && datanode.shouldRun && ackQueue.size()
== 0) {
                     if (LOG.isDebugEnabled()) {
-                      LOG.debug("PacketResponder " + numTargets + 
-                                " seqno = " + seqno +
-                                " for block " + block +
+                      LOG.debug(myString + ": seqno=" + seqno +
                                 " waiting for local datanode to finish write.");
                     }
                     wait();
@@ -911,11 +929,10 @@ class BlockReceiver implements java.io.C
                   }
                   pkt = ackQueue.getFirst();
                   expected = pkt.seqno;
-                  if (numTargets > 0 && seqno != expected) {
-                    throw new IOException("PacketResponder " + numTargets +
-                                          " for block " + block +
-                                          " expected seqno:" + expected +
-                                          " received:" + seqno);
+                  if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE
+                      && seqno != expected) {
+                    throw new IOException(myString + "seqno: expected="
+                        + expected + ", received=" + seqno);
                   }
                   lastPacketInBlock = pkt.lastPacketInBlock;
                 }
@@ -930,8 +947,7 @@ class BlockReceiver implements java.io.C
                 // notify client of the error
                 // and wait for the client to shut down the pipeline
                 mirrorError = true;
-                LOG.info("PacketResponder " + block + " " + numTargets + 
-                      " Exception " + StringUtils.stringifyException(ioe));
+                LOG.info(myString, ioe);
               }
             }
 
@@ -943,8 +959,7 @@ class BlockReceiver implements java.io.C
                * because this datanode has a problem. The upstream datanode
                * will detect that this datanode is bad, and rightly so.
                */
-              LOG.info("PacketResponder " + block +  " " + numTargets +
-                       " : Thread is interrupted.");
+              LOG.info(myString + ": Thread is interrupted.");
               running = false;
               continue;
             }
@@ -952,7 +967,7 @@ class BlockReceiver implements java.io.C
             // If this is the last packet in block, then close block
             // file and finalize the block before responding success
             if (lastPacketInBlock) {
-              receiver.close();
+              BlockReceiver.this.close();
               final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
               block.setNumBytes(replicaInfo.getNumBytes());
               datanode.data.finalizeBlock(block);
@@ -960,13 +975,12 @@ class BlockReceiver implements java.io.C
               if (ClientTraceLog.isInfoEnabled() && isClient) {
                 long offset = 0;
                 ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
-                      receiver.inAddr, receiver.myAddr, block.getNumBytes(),
-                      "HDFS_WRITE", receiver.clientname, offset,
+                      inAddr, myAddr, block.getNumBytes(),
+                      "HDFS_WRITE", clientname, offset,
                       datanode.dnRegistration.getStorageID(), block, endTime-startTime));
               } else {
-                LOG.info("Received block " + block + 
-                         " of size " + block.getNumBytes() + 
-                         " from " + receiver.inAddr);
+                LOG.info("Received block " + block + " of size "
+                    + block.getNumBytes() + " from " + inAddr);
               }
             }
 
@@ -977,7 +991,8 @@ class BlockReceiver implements java.io.C
               replies[0] = SUCCESS;
               replies[1] = ERROR;
             } else {
-              short ackLen = numTargets == 0 ? 0 : ack.getNumOfReplies();
+              short ackLen = type == PacketResponderType.LAST_IN_PIPELINE? 0
+                  : ack.getNumOfReplies();
               replies = new Status[1+ackLen];
               replies[0] = SUCCESS;
               for (int i=0; i<ackLen; i++) {
@@ -987,20 +1002,18 @@ class BlockReceiver implements java.io.C
             PipelineAck replyAck = new PipelineAck(expected, replies);
             
             // send my ack back to upstream datanode
-            replyAck.write(replyOut);
-            replyOut.flush();
+            replyAck.write(upstreamOut);
+            upstreamOut.flush();
             if (LOG.isDebugEnabled()) {
-              LOG.debug("PacketResponder " + numTargets + 
-                        " for block " + block +
-                        " responded an ack: " + replyAck);
+              LOG.debug(myString + ", replyAck=" + replyAck);
             }
             if (pkt != null) {
               // remove the packet from the ack queue
               removeAckHead();
               // update bytes acked
               if (replyAck.isSuccess() && 
-                  pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
-                replicaInfo.setBytesAcked(pkt.lastByteInBlock);
+                  pkt.offsetInBlock > replicaInfo.getBytesAcked()) {
+                replicaInfo.setBytesAcked(pkt.offsetInBlock);
               }
             }
         } catch (IOException e) {
@@ -1011,8 +1024,7 @@ class BlockReceiver implements java.io.C
             } catch (IOException ioe) {
               LOG.warn("DataNode.checkDiskError failed in run() with: ", ioe);
             }
-            LOG.info("PacketResponder " + block + " " + numTargets + 
-                     " Exception " + StringUtils.stringifyException(e));
+            LOG.info(myString, e);
             running = false;
             if (!Thread.interrupted()) { // failure not caused by interruption
               receiverThread.interrupt();
@@ -1020,15 +1032,13 @@ class BlockReceiver implements java.io.C
           }
         } catch (Throwable e) {
           if (running) {
-            LOG.info("PacketResponder " + block + " " + numTargets + 
-                     " Exception " + StringUtils.stringifyException(e));
+            LOG.info(myString, e);
             running = false;
             receiverThread.interrupt();
           }
         }
       }
-      LOG.info("PacketResponder " + numTargets + 
-               " for block " + block + " terminating");
+      LOG.info(myString + " terminating");
     }
     
     /**
@@ -1045,15 +1055,23 @@ class BlockReceiver implements java.io.C
   /**
    * This information is cached by the Datanode in the ackQueue.
    */
-  static private class Packet {
-    long seqno;
-    boolean lastPacketInBlock;
-    long lastByteInBlock;
+  private static class Packet {
+    final long seqno;
+    final boolean lastPacketInBlock;
+    final long offsetInBlock;
 
-    Packet(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
+    Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock) {
       this.seqno = seqno;
       this.lastPacketInBlock = lastPacketInBlock;
-      this.lastByteInBlock = lastByteInPacket;
+      this.offsetInBlock = offsetInBlock;
+    }
+
+    @Override
+    public String toString() {
+      return getClass().getSimpleName() + "(seqno=" + seqno
+        + ", lastPacketInBlock=" + lastPacketInBlock
+        + ", offsetInBlock=" + offsetInBlock
+        + ")";
     }
   }
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1092432&r1=1092431&r2=1092432&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Thu
Apr 14 18:35:49 2011
@@ -376,7 +376,7 @@ class DataXceiver extends DataTransferPr
       if (blockReceiver != null) {
         String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
         blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
-            mirrorAddr, null, targets.length);
+            mirrorAddr, null, targets);
 
         // send close-ack for transfer-RBW/Finalized 
         if (isTransfer) {
@@ -647,7 +647,7 @@ class DataXceiver extends DataTransferPr
 
       // receive a block
       blockReceiver.receiveBlock(null, null, null, null, 
-          dataXceiverServer.balanceThrottler, -1);
+          dataXceiverServer.balanceThrottler, null);
                     
       // notify name node
       datanode.notifyNamenodeReceivedBlock(block, sourceID);

Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj?rev=1092432&r1=1092431&r2=1092432&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
(original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
Thu Apr 14 18:35:49 2011
@@ -44,6 +44,11 @@ import org.apache.hadoop.util.DiskChecke
 privileged public aspect BlockReceiverAspects {
   public static final Log LOG = LogFactory.getLog(BlockReceiverAspects.class);
 
+  BlockReceiver BlockReceiver.PacketResponder.getReceiver(){
+    LOG.info("FI: getReceiver() " + getClass().getName());
+    return BlockReceiver.this;
+  }
+
   pointcut callReceivePacket(BlockReceiver blockreceiver) :
     call(* receivePacket(..)) && target(blockreceiver);
 	
@@ -80,7 +85,7 @@ privileged public aspect BlockReceiverAs
 
   after(BlockReceiver.PacketResponder responder)
       throws IOException: afterDownstreamStatusRead(responder) {
-    final DataNode d = responder.receiver.getDataNode();
+    final DataNode d = responder.getReceiver().getDataNode();
     DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
     if (dtTest != null)
       dtTest.fiAfterDownstreamStatusRead.run(d.getDatanodeRegistration());
@@ -124,8 +129,9 @@ privileged public aspect BlockReceiverAs
       LOG.debug("FI: no pipeline has been found in acking");
       return;
     }
-    LOG.debug("FI: Acked total bytes from: " + 
-        pr.receiver.datanode.dnRegistration.getStorageID() + ": " + acked);
+    LOG.debug("FI: Acked total bytes from: "
+        + pr.getReceiver().datanode.dnRegistration.getStorageID()
+        + ": " + acked);
     if (pTest instanceof PipelinesTest) {
       bytesAckedService((PipelinesTest)pTest, pr, acked);
     }
@@ -133,7 +139,7 @@ privileged public aspect BlockReceiverAs
 
   private void bytesAckedService 
       (final PipelinesTest pTest, final PacketResponder pr, final long acked) {
-    NodeBytes nb = new NodeBytes(pr.receiver.datanode.dnRegistration, acked);
+    NodeBytes nb = new NodeBytes(pr.getReceiver().datanode.dnRegistration, acked);
     try {
       pTest.fiCallSetBytesAcked.run(nb);
     } catch (IOException e) {
@@ -201,7 +207,7 @@ privileged public aspect BlockReceiverAs
 
   after(BlockReceiver.PacketResponder packetresponder) throws IOException
       : pipelineAck(packetresponder) {
-    final DatanodeRegistration dr = packetresponder.receiver.getDataNode().getDatanodeRegistration();
+    final DatanodeRegistration dr = packetresponder.getReceiver().getDataNode().getDatanodeRegistration();
     LOG.info("FI: fiPipelineAck, datanode=" + dr);
 
     final DataTransferTest test = DataTransferTestUtil.getDataTransferTest();



Mime
View raw message