hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1413826 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: CHANGES.txt src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
Date Mon, 26 Nov 2012 20:47:58 GMT
Author: suresh
Date: Mon Nov 26 20:47:58 2012
New Revision: 1413826

URL: http://svn.apache.org/viewvc?rev=1413826&view=rev
Log:
HDFS-4200. Reduce the size of synchronized sections in PacketResponder. Contributed by Suresh
Srinivas.

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1413826&r1=1413825&r2=1413826&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Nov 26 20:47:58 2012
@@ -167,6 +167,9 @@ Trunk (Unreleased)
     HDFS-4215. Remove locking from addToParent(..) since it is used in image
     loading, and add INode.isFile().  (szetszwo)
 
+    HDFS-4200. Reduce the size of synchronized sections in PacketResponder.
+    (suresh)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1413826&r1=1413825&r2=1413826&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
(original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
Mon Nov 26 20:47:58 2012
@@ -638,10 +638,7 @@ class BlockReceiver implements Closeable
         responder.start(); // start thread to processes responses
       }
 
-      /* 
-       * Receive until the last packet.
-       */
-      while (receivePacket() >= 0) {}
+      while (receivePacket() >= 0) { /* Receive until the last packet */ }
 
       // wait for all outstanding packet responses. And then
       // indicate responder to gracefully shutdown.
@@ -724,7 +721,7 @@ class BlockReceiver implements Closeable
   static private long checksum2long(byte[] checksum) {
     long crc = 0L;
     for(int i=0; i<checksum.length; i++) {
-      crc |= (0xffL&(long)checksum[i])<<((checksum.length-i-1)*8);
+      crc |= (0xffL&checksum[i])<<((checksum.length-i-1)*8);
     }
     return crc;
   }
@@ -783,24 +780,23 @@ class BlockReceiver implements Closeable
     NON_PIPELINE, LAST_IN_PIPELINE, HAS_DOWNSTREAM_IN_PIPELINE
   }
   
+  private static Status[] MIRROR_ERROR_STATUS = {Status.SUCCESS, Status.ERROR};
+  
   /**
    * Processed responses from downstream datanodes in the pipeline
    * and sends back replies to the originator.
    */
   class PacketResponder implements Runnable, Closeable {   
-
-    /** queue for packets waiting for ack */
+    /** queue for packets waiting for ack - synchronization using monitor lock */
     private final LinkedList<Packet> ackQueue = new LinkedList<Packet>(); 
     /** the thread that spawns this responder */
     private final Thread receiverThread = Thread.currentThread();
-    /** is this responder running? */
+    /** is this responder running? - synchronization using monitor lock */
     private volatile boolean running = true;
-
     /** input from the next downstream datanode */
     private final DataInputStream downstreamIn;
     /** output to upstream datanode/client */
     private final DataOutputStream upstreamOut;
-
     /** The type of this responder */
     private final PacketResponderType type;
     /** for log and error messages */
@@ -812,8 +808,7 @@ class BlockReceiver implements Closeable
     }
 
     PacketResponder(final DataOutputStream upstreamOut,
-        final DataInputStream downstreamIn,
-        final DatanodeInfo[] downstreams) {
+        final DataInputStream downstreamIn, final DatanodeInfo[] downstreams) {
       this.downstreamIn = downstreamIn;
       this.upstreamOut = upstreamOut;
 
@@ -830,31 +825,49 @@ class BlockReceiver implements Closeable
       this.myString = b.toString();
     }
 
+    private boolean isRunning() {
+      return running && datanode.shouldRun;
+    }
+    
     /**
      * enqueue the seqno that is still be to acked by the downstream datanode.
      * @param seqno
      * @param lastPacketInBlock
      * @param offsetInBlock
      */
-    synchronized void enqueue(final long seqno,
-        final boolean lastPacketInBlock, final long offsetInBlock) {
-      if (running) {
-        final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock,
-            System.nanoTime());
-        if(LOG.isDebugEnabled()) {
-          LOG.debug(myString + ": enqueue " + p);
+    void enqueue(final long seqno, final boolean lastPacketInBlock,
+        final long offsetInBlock) {
+      final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock,
+          System.nanoTime());
+      if(LOG.isDebugEnabled()) {
+        LOG.debug(myString + ": enqueue " + p);
+      }
+      synchronized(this) {
+        if (running) {
+          ackQueue.addLast(p);
+          notifyAll();
         }
-        ackQueue.addLast(p);
-        notifyAll();
       }
     }
+    
+    /** Wait for a packet with given {@code seqno} to be enqueued to ackQueue */
+    synchronized Packet waitForAckHead(long seqno) throws InterruptedException {
+      while (isRunning() && ackQueue.size() == 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(myString + ": seqno=" + seqno +
+                    " waiting for local datanode to finish write.");
+        }
+        wait();
+      }
+      return isRunning() ? ackQueue.getFirst() : null;
+    }
 
     /**
      * wait for all pending packets to be acked. Then shutdown thread.
      */
     @Override
     public synchronized void close() {
-      while (running && ackQueue.size() != 0 && datanode.shouldRun) {
+      while (isRunning() && ackQueue.size() != 0) {
         try {
           wait();
         } catch (InterruptedException e) {
@@ -877,147 +890,97 @@ class BlockReceiver implements Closeable
     public void run() {
       boolean lastPacketInBlock = false;
       final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
-      while (running && datanode.shouldRun && !lastPacketInBlock) {
-
+      while (isRunning() && !lastPacketInBlock) {
         long totalAckTimeNanos = 0;
         boolean isInterrupted = false;
         try {
-            Packet pkt = null;
-            long expected = -2;
-            PipelineAck ack = new PipelineAck();
-            long seqno = PipelineAck.UNKOWN_SEQNO;
-            long ackRecvNanoTime = 0;
-            try {
-              if (type != PacketResponderType.LAST_IN_PIPELINE
-                  && !mirrorError) {
-                // read an ack from downstream datanode
-                ack.readFields(downstreamIn);
-                ackRecvNanoTime = System.nanoTime();
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug(myString + " got " + ack);
-                }
-                seqno = ack.getSeqno();
+          Packet pkt = null;
+          long expected = -2;
+          PipelineAck ack = new PipelineAck();
+          long seqno = PipelineAck.UNKOWN_SEQNO;
+          long ackRecvNanoTime = 0;
+          try {
+            if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) {
+              // read an ack from downstream datanode
+              ack.readFields(downstreamIn);
+              ackRecvNanoTime = System.nanoTime();
+              if (LOG.isDebugEnabled()) {
+                LOG.debug(myString + " got " + ack);
               }
-              if (seqno != PipelineAck.UNKOWN_SEQNO
-                  || type == PacketResponderType.LAST_IN_PIPELINE) {
-                synchronized (this) {
-                  while (running && datanode.shouldRun && ackQueue.size()
== 0) {
-                    if (LOG.isDebugEnabled()) {
-                      LOG.debug(myString + ": seqno=" + seqno +
-                                " waiting for local datanode to finish write.");
-                    }
-                    wait();
-                  }
-                  if (!running || !datanode.shouldRun) {
-                    break;
-                  }
-                  pkt = ackQueue.getFirst();
-                  expected = pkt.seqno;
-                  if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE
-                      && seqno != expected) {
-                    throw new IOException(myString + "seqno: expected="
-                        + expected + ", received=" + seqno);
-                  }
-                  if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) {
-                    // The total ack time includes the ack times of downstream nodes.
-                    // The value is 0 if this responder doesn't have a downstream
-                    // DN in the pipeline.
-                    totalAckTimeNanos = ackRecvNanoTime - pkt.ackEnqueueNanoTime;
-                    // Report the elapsed time from ack send to ack receive minus
-                    // the downstream ack time.
-                    long ackTimeNanos = totalAckTimeNanos - ack.getDownstreamAckTimeNanos();
-                    if (ackTimeNanos < 0) {
-                      if (LOG.isDebugEnabled()) {
-                        LOG.debug("Calculated invalid ack time: " + ackTimeNanos + "ns.");
-                      }
-                    } else {
-                      datanode.metrics.addPacketAckRoundTripTimeNanos(ackTimeNanos);
-                    }
+              seqno = ack.getSeqno();
+            }
+            if (seqno != PipelineAck.UNKOWN_SEQNO
+                || type == PacketResponderType.LAST_IN_PIPELINE) {
+              pkt = waitForAckHead(seqno);
+              if (!isRunning()) {
+                break;
+              }
+              expected = pkt.seqno;
+              if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE
+                  && seqno != expected) {
+                throw new IOException(myString + "seqno: expected=" + expected
+                    + ", received=" + seqno);
+              }
+              if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) {
+                // The total ack time includes the ack times of downstream
+                // nodes.
+                // The value is 0 if this responder doesn't have a downstream
+                // DN in the pipeline.
+                totalAckTimeNanos = ackRecvNanoTime - pkt.ackEnqueueNanoTime;
+                // Report the elapsed time from ack send to ack receive minus
+                // the downstream ack time.
+                long ackTimeNanos = totalAckTimeNanos
+                    - ack.getDownstreamAckTimeNanos();
+                if (ackTimeNanos < 0) {
+                  if (LOG.isDebugEnabled()) {
+                    LOG.debug("Calculated invalid ack time: " + ackTimeNanos
+                        + "ns.");
                   }
-                  lastPacketInBlock = pkt.lastPacketInBlock;
+                } else {
+                  datanode.metrics.addPacketAckRoundTripTimeNanos(ackTimeNanos);
                 }
               }
-            } catch (InterruptedException ine) {
+              lastPacketInBlock = pkt.lastPacketInBlock;
+            }
+          } catch (InterruptedException ine) {
+            isInterrupted = true;
+          } catch (IOException ioe) {
+            if (Thread.interrupted()) {
               isInterrupted = true;
-            } catch (IOException ioe) {
-              if (Thread.interrupted()) {
-                isInterrupted = true;
-              } else {
-                // continue to run even if can not read from mirror
-                // notify client of the error
-                // and wait for the client to shut down the pipeline
-                mirrorError = true;
-                LOG.info(myString, ioe);
-              }
+            } else {
+              // continue to run even if can not read from mirror
+              // notify client of the error
+              // and wait for the client to shut down the pipeline
+              mirrorError = true;
+              LOG.info(myString, ioe);
             }
+          }
 
-            if (Thread.interrupted() || isInterrupted) {
-              /* The receiver thread cancelled this thread. 
-               * We could also check any other status updates from the 
-               * receiver thread (e.g. if it is ok to write to replyOut). 
-               * It is prudent to not send any more status back to the client
-               * because this datanode has a problem. The upstream datanode
-               * will detect that this datanode is bad, and rightly so.
-               */
-              LOG.info(myString + ": Thread is interrupted.");
-              running = false;
-              continue;
-            }
-            
-            // If this is the last packet in block, then close block
-            // file and finalize the block before responding success
-            if (lastPacketInBlock) {
-              BlockReceiver.this.close();
-              final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
-              block.setNumBytes(replicaInfo.getNumBytes());
-              datanode.data.finalizeBlock(block);
-              datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
-              if (ClientTraceLog.isInfoEnabled() && isClient) {
-                long offset = 0;
-                DatanodeRegistration dnR = 
-                  datanode.getDNRegistrationForBP(block.getBlockPoolId());
-                ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
-                      inAddr, myAddr, block.getNumBytes(),
-                      "HDFS_WRITE", clientname, offset,
-                      dnR.getStorageID(), block, endTime-startTime));
-              } else {
-                LOG.info("Received " + block + " size "
-                    + block.getNumBytes() + " from " + inAddr);
-              }
-            }
+          if (Thread.interrupted() || isInterrupted) {
+            /*
+             * The receiver thread cancelled this thread. We could also check
+             * any other status updates from the receiver thread (e.g. if it is
+             * ok to write to replyOut). It is prudent to not send any more
+             * status back to the client because this datanode has a problem.
+             * The upstream datanode will detect that this datanode is bad, and
+             * rightly so.
+             */
+            LOG.info(myString + ": Thread is interrupted.");
+            running = false;
+            continue;
+          }
 
-            // construct my ack message
-            Status[] replies = null;
-            if (mirrorError) { // ack read error
-              replies = new Status[2];
-              replies[0] = Status.SUCCESS;
-              replies[1] = Status.ERROR;
-            } else {
-              short ackLen = type == PacketResponderType.LAST_IN_PIPELINE? 0
-                  : ack.getNumOfReplies();
-              replies = new Status[1+ackLen];
-              replies[0] = Status.SUCCESS;
-              for (int i=0; i<ackLen; i++) {
-                replies[i+1] = ack.getReply(i);
-              }
-            }
-            PipelineAck replyAck = new PipelineAck(expected, replies, totalAckTimeNanos);
-            
-            if (replyAck.isSuccess() && 
-                 pkt.offsetInBlock > replicaInfo.getBytesAcked())
-                replicaInfo.setBytesAcked(pkt.offsetInBlock);
-
-            // send my ack back to upstream datanode
-            replyAck.write(upstreamOut);
-            upstreamOut.flush();
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(myString + ", replyAck=" + replyAck);
-            }
-            if (pkt != null) {
-              // remove the packet from the ack queue
-              removeAckHead();
-              // update bytes acked
-            }
+          if (lastPacketInBlock) {
+            // Finalize the block and close the block file
+            finalizeBlock(startTime);
+          }
+
+          sendAckUpstream(ack, expected, totalAckTimeNanos,
+              (pkt != null ? pkt.offsetInBlock : 0));
+          if (pkt != null) {
+            // remove the packet from the ack queue
+            removeAckHead();
+          }
         } catch (IOException e) {
           LOG.warn("IOException in BlockReceiver.run(): ", e);
           if (running) {
@@ -1044,6 +1007,66 @@ class BlockReceiver implements Closeable
     }
     
     /**
+     * Finalize the block and close the block file
+     * @param startTime time when BlockReceiver started receiving the block
+     */
+    private void finalizeBlock(long startTime) throws IOException {
+      BlockReceiver.this.close();
+      final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime()
+          : 0;
+      block.setNumBytes(replicaInfo.getNumBytes());
+      datanode.data.finalizeBlock(block);
+      datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
+      if (ClientTraceLog.isInfoEnabled() && isClient) {
+        long offset = 0;
+        DatanodeRegistration dnR = datanode.getDNRegistrationForBP(block
+            .getBlockPoolId());
+        ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT, inAddr,
+            myAddr, block.getNumBytes(), "HDFS_WRITE", clientname, offset,
+            dnR.getStorageID(), block, endTime - startTime));
+      } else {
+        LOG.info("Received " + block + " size " + block.getNumBytes()
+            + " from " + inAddr);
+      }
+    }
+    
+    /**
+     * @param ack Ack received from downstream
+     * @param seqno sequence number of ack to be sent upstream
+     * @param totalAckTimeNanos total ack time including all the downstream
+     *          nodes
+     * @param offsetInBlock offset in block for the data in packet
+     */
+    private void sendAckUpstream(PipelineAck ack, long seqno,
+        long totalAckTimeNanos, long offsetInBlock) throws IOException {
+      Status[] replies = null;
+      if (mirrorError) { // ack read error
+        replies = MIRROR_ERROR_STATUS;
+      } else {
+        short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack
+            .getNumOfReplies();
+        replies = new Status[1 + ackLen];
+        replies[0] = Status.SUCCESS;
+        for (int i = 0; i < ackLen; i++) {
+          replies[i + 1] = ack.getReply(i);
+        }
+      }
+      PipelineAck replyAck = new PipelineAck(seqno, replies,
+          totalAckTimeNanos);
+      if (replyAck.isSuccess()
+          && offsetInBlock > replicaInfo.getBytesAcked()) {
+        replicaInfo.setBytesAcked(offsetInBlock);
+      }
+
+      // send my ack back to upstream datanode
+      replyAck.write(upstreamOut);
+      upstreamOut.flush();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(myString + ", replyAck=" + replyAck);
+      }
+    }
+    
+    /**
      * Remove a packet from the head of the ack queue
      * 
      * This should be called only when the ack queue is not empty



Mime
View raw message