hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hair...@apache.org
Subject svn commit: r824552 - in /hadoop/hdfs/trunk: CHANGES.txt src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
Date Tue, 13 Oct 2009 00:30:15 GMT
Author: hairong
Date: Tue Oct 13 00:30:15 2009
New Revision: 824552

URL: http://svn.apache.org/viewvc?rev=824552&view=rev
Log:
HDFS-673. BlockReceiver#PacketResponder should not remove a packet from the ack queue before
its ack is sent. Contributed by Hairong Kuang. 


Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=824552&r1=824551&r2=824552&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Oct 13 00:30:15 2009
@@ -392,6 +392,9 @@
 
     HDFS-676. Fix NPE in FSDataset.updateReplicaUnderRecovery() (shv)
 
+    HDFS-673. BlockReceiver#PacketResponder should not remove a packet from
+    the ack queue before its ack is sent. (hairong)
+
 Release 0.20.1 - 2009-09-01
 
   IMPROVEMENTS

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=824552&r1=824551&r2=824552&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue
Oct 13 00:30:15 2009
@@ -517,6 +517,8 @@
           }
           replicaInfo.setBytesOnDisk(offsetInBlock);
           datanode.myMetrics.bytesWritten.inc(len);
+          /// flush entire packet
+          flush();
         }
       } catch (IOException iex) {
         datanode.checkDiskError(iex);
@@ -524,9 +526,6 @@
       }
     }
 
-    /// flush entire packet before sending ack
-    flush();
-
     if (throttler != null) { // throttle I/O
       throttler.throttle(len);
     }
@@ -804,7 +803,7 @@
             if (!running || !datanode.shouldRun) {
               break;
             }
-            Packet pkt = ackQueue.removeFirst();
+            Packet pkt = ackQueue.getFirst();
             long expected = pkt.seqno;
             notifyAll();
             LOG.debug("PacketResponder " + numTargets +
@@ -837,6 +836,7 @@
             replyOut.writeLong(expected);
             SUCCESS.write(replyOut);
             replyOut.flush();
+            ackQueue.removeFirst();
             if (pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
               replicaInfo.setBytesAcked(pkt.lastByteInBlock);
             }
@@ -872,7 +872,6 @@
       }
 
       boolean lastPacketInBlock = false;
-      Packet pkt = null;
       final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
       while (running && datanode.shouldRun && !lastPacketInBlock) {
 
@@ -880,6 +879,7 @@
         try {
             DataTransferProtocol.Status op = SUCCESS;
             boolean didRead = false;
+            Packet pkt = null;
             long expected = -2;
             try { 
               // read seqno from downstream datanode
@@ -910,7 +910,7 @@
                       throw e;
                     }
                   }
-                  pkt = ackQueue.removeFirst();
+                  pkt = ackQueue.getFirst();
                   expected = pkt.seqno;
                   notifyAll();
                   LOG.debug("PacketResponder " + numTargets + " seqno = " + seqno);
@@ -1001,12 +1001,17 @@
               op.write(replyOut);
             }
             replyOut.flush();
+            
             LOG.debug("PacketResponder " + block + " " + numTargets + 
                       " responded other status " + " for seqno " + expected);
 
-            if (pkt != null && success && 
-                pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
-              replicaInfo.setBytesAcked(pkt.lastByteInBlock);
+            if (pkt != null) {
+              // remove the packet from the queue
+              ackQueue.removeFirst();
+              // update bytes acked
+              if (success && pkt.lastByteInBlock>replicaInfo.getBytesAcked())
{
+                replicaInfo.setBytesAcked(pkt.lastByteInBlock);
+              }
             }
             // If we were unable to read the seqno from downstream, then stop.
             if (expected == -2) {



Mime
View raw message