hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hair...@apache.org
Subject svn commit: r890664 - in /hadoop/hdfs/branches/branch-0.21: ./ .eclipse.templates/.launches/ src/contrib/ src/contrib/hdfsproxy/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server...
Date Tue, 15 Dec 2009 06:27:20 GMT
Author: hairong
Date: Tue Dec 15 06:27:19 2009
New Revision: 890664

URL: http://svn.apache.org/viewvc?rev=890664&view=rev
Log:
Merge -r 890655 to move the change of HDFS-724 from trunk to branch 0.21.

Added:
    hadoop/hdfs/branches/branch-0.21/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java
      - copied unchanged from r890655, hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java
Modified:
    hadoop/hdfs/branches/branch-0.21/   (props changed)
    hadoop/hdfs/branches/branch-0.21/.eclipse.templates/.launches/   (props changed)
    hadoop/hdfs/branches/branch-0.21/CHANGES.txt   (contents, props changed)
    hadoop/hdfs/branches/branch-0.21/build.xml   (props changed)
    hadoop/hdfs/branches/branch-0.21/src/contrib/build.xml   (props changed)
    hadoop/hdfs/branches/branch-0.21/src/contrib/hdfsproxy/   (props changed)
    hadoop/hdfs/branches/branch-0.21/src/java/   (props changed)
    hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
    hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java
  (props changed)
    hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
  (props changed)
    hadoop/hdfs/branches/branch-0.21/src/test/aop/org/apache/hadoop/hdfs/protocol/   (props
changed)
    hadoop/hdfs/branches/branch-0.21/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
    hadoop/hdfs/branches/branch-0.21/src/test/hdfs/   (props changed)
    hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java
    hadoop/hdfs/branches/branch-0.21/src/webapps/datanode/   (props changed)
    hadoop/hdfs/branches/branch-0.21/src/webapps/hdfs/   (props changed)
    hadoop/hdfs/branches/branch-0.21/src/webapps/secondary/   (props changed)

Propchange: hadoop/hdfs/branches/branch-0.21/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 15 06:27:19 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/hdfs:713112
 /hadoop/hdfs/branches/HDFS-265:796829-820463
-/hadoop/hdfs/trunk:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002
+/hadoop/hdfs/trunk:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002,890655

Propchange: hadoop/hdfs/branches/branch-0.21/.eclipse.templates/.launches/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 15 06:27:19 2009
@@ -1 +1 @@
-/hadoop/hdfs/trunk/.eclipse.templates/.launches:817853-817863,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002
+/hadoop/hdfs/trunk/.eclipse.templates/.launches:817853-817863,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002,890655

Modified: hadoop/hdfs/branches/branch-0.21/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/CHANGES.txt?rev=890664&r1=890663&r2=890664&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/CHANGES.txt (original)
+++ hadoop/hdfs/branches/branch-0.21/CHANGES.txt Tue Dec 15 06:27:19 2009
@@ -525,6 +525,9 @@
     HDFS-185. Disallow chown, chgrp, chmod, setQuota, and setSpaceQuota when
     name-node is in safemode. (Ravi Phulari via shv)
 
+    HDFS-724. Pipeline hangs if one of the block receiver is not responsive.
+    (hairong)
+
 Release 0.20.1 - 2009-09-01
 
   IMPROVEMENTS

Propchange: hadoop/hdfs/branches/branch-0.21/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 15 06:27:19 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/hdfs/CHANGES.txt:713112
 /hadoop/hdfs/branches/HDFS-265/CHANGES.txt:796829-820463
-/hadoop/hdfs/trunk/CHANGES.txt:817853-817863,818294-818298,818801,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002
+/hadoop/hdfs/trunk/CHANGES.txt:817853-817863,818294-818298,818801,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002,890655

Propchange: hadoop/hdfs/branches/branch-0.21/build.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 15 06:27:19 2009
@@ -1,4 +1,4 @@
 /hadoop/core/branches/branch-0.19/hdfs/build.xml:713112
 /hadoop/core/trunk/build.xml:779102
 /hadoop/hdfs/branches/HDFS-265/build.xml:796829-820463
-/hadoop/hdfs/trunk/build.xml:817853-817863,818294-818298,818801,824552,824944,825229,826149,828116,828926,829258,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002
+/hadoop/hdfs/trunk/build.xml:817853-817863,818294-818298,818801,824552,824944,825229,826149,828116,828926,829258,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002,890655

Propchange: hadoop/hdfs/branches/branch-0.21/src/contrib/build.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 15 06:27:19 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/contrib/build.xml:713112
 /hadoop/hdfs/branches/HDFS-265/src/contrib/build.xml:796829-820463
-/hadoop/hdfs/trunk/src/contrib/build.xml:817853-817863,818294-818298,818801,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002
+/hadoop/hdfs/trunk/src/contrib/build.xml:817853-817863,818294-818298,818801,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002,890655

Propchange: hadoop/hdfs/branches/branch-0.21/src/contrib/hdfsproxy/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 15 06:27:19 2009
@@ -1,4 +1,4 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/contrib/hdfsproxy:713112
 /hadoop/core/trunk/src/contrib/hdfsproxy:776175-784663
 /hadoop/hdfs/branches/HDFS-265/src/contrib/hdfsproxy:796829-820463
-/hadoop/hdfs/trunk/src/contrib/hdfsproxy:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002
+/hadoop/hdfs/trunk/src/contrib/hdfsproxy:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002,890655

Propchange: hadoop/hdfs/branches/branch-0.21/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 15 06:27:19 2009
@@ -1,4 +1,4 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/java:713112
 /hadoop/core/trunk/src/hdfs:776175-785643,785929-786278
 /hadoop/hdfs/branches/HDFS-265/src/java:796829-820463
-/hadoop/hdfs/trunk/src/java:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002
+/hadoop/hdfs/trunk/src/java:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002,890655

Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=890664&r1=890663&r2=890664&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue Dec
15 06:27:19 2009
@@ -2434,7 +2434,27 @@
       int     dataPos;
       int     checksumStart;
       int     checksumPos;      
-  
+      private static final long HEART_BEAT_SEQNO = -1L;
+
+      /**
+       *  create a heartbeat packet
+       */
+      Packet() {
+        this.lastPacketInBlock = false;
+        this.numChunks = 0;
+        this.offsetInBlock = 0;
+        this.seqno = HEART_BEAT_SEQNO;
+        
+        buffer = null;
+        int packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
+        buf = new byte[packetSize];
+        
+        checksumStart = dataStart = packetSize;
+        checksumPos = checksumStart;
+        dataPos = dataStart;
+        maxChunks = 0;
+      }
+      
       // create a new packet
       Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {
         this.lastPacketInBlock = false;
@@ -2521,6 +2541,14 @@
         return offsetInBlock + dataPos - dataStart;
       }
       
+      /**
+       * Check if this packet is a heart beat packet
+       * @return true if the sequence number is HEART_BEAT_SEQNO
+       */
+      private boolean isHeartbeatPacket() {
+        return seqno == HEART_BEAT_SEQNO;
+      }
+      
       public String toString() {
         return "packet seqno:" + this.seqno +
         " offsetInBlock:" + this.offsetInBlock + 
@@ -2638,6 +2666,7 @@
        * and closes them. Any error recovery is also done by this thread.
        */
       public void run() {
+        long lastPacket = System.currentTimeMillis();
         while (!streamerClosed && clientRunning) {
 
           // if the Responder encountered an error, shutdown Responder
@@ -2661,19 +2690,32 @@
 
             synchronized (dataQueue) {
               // wait for a packet to be sent.
+              long now = System.currentTimeMillis();
               while ((!streamerClosed && !hasError && clientRunning 
-                  && dataQueue.size() == 0) || doSleep) {
+                  && dataQueue.size() == 0 && 
+                  (stage != BlockConstructionStage.DATA_STREAMING || 
+                   stage == BlockConstructionStage.DATA_STREAMING && 
+                   now - lastPacket < socketTimeout/2)) || doSleep ) {
+                long timeout = socketTimeout/2 - (now-lastPacket);
+                timeout = timeout <= 0 ? 1000 : timeout;
+                timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
+                   timeout : 1000;
                 try {
-                  dataQueue.wait(1000);
+                  dataQueue.wait(timeout);
                 } catch (InterruptedException  e) {
                 }
                 doSleep = false;
+                now = System.currentTimeMillis();
               }
-              if (streamerClosed || hasError || dataQueue.size() == 0 || !clientRunning)
{
+              if (streamerClosed || hasError || !clientRunning) {
                 continue;
               }
               // get packet to be sent.
-              one = dataQueue.getFirst();
+              if (dataQueue.isEmpty()) {
+                one = new Packet();  // heartbeat packet
+              } else {
+                one = dataQueue.getFirst(); // regular data packet
+              }
             }
 
             // get new block from namenode.
@@ -2719,9 +2761,11 @@
 
             synchronized (dataQueue) {
               // move packet from dataQueue to ackQueue
-              dataQueue.removeFirst();
-              ackQueue.addLast(one);
-              dataQueue.notifyAll();
+              if (!one.isHeartbeatPacket()) {
+                dataQueue.removeFirst();
+                ackQueue.addLast(one);
+                dataQueue.notifyAll();
+              }
             }
 
             if (LOG.isDebugEnabled()) {
@@ -2732,6 +2776,10 @@
             // write out data to remote datanode
             blockStream.write(buf.array(), buf.position(), buf.remaining());
             blockStream.flush();
+            lastPacket = System.currentTimeMillis();
+            
+            if (one.isHeartbeatPacket()) {  //heartbeat packet
+            }
             
             // update bytesSent
             long tmpBytesSent = one.getLastByteOffsetBlock();
@@ -2868,24 +2916,7 @@
               }
               
               long seqno = ack.getSeqno();
-              Packet one = null;
-              if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
-                continue;
-              } else if (seqno == -2) {
-                // no nothing
-              } else {
-                synchronized (dataQueue) {
-                  one = ackQueue.getFirst();
-                }
-                if (one.seqno != seqno) {
-                  throw new IOException("Responseprocessor: Expecting seqno " + 
-                      " for block " + block +
-                      one.seqno + " but received " + seqno);
-                }
-                isLastPacketInBlock = one.lastPacketInBlock;
-              }
-
-              // processes response status from all datanodes.
+              // processes response status from datanodes.
               for (int i = ack.getNumOfReplies()-1; i >=0  && clientRunning; i--)
{
                 final DataTransferProtocol.Status reply = ack.getReply(i);
                 if (reply != SUCCESS) {
@@ -2896,12 +2927,24 @@
                       targets[i].getName());
                 }
               }
+              
+              assert seqno != PipelineAck.UNKOWN_SEQNO : 
+                "Ack for unkown seqno should be a failed ack: " + ack;
+              if (seqno == Packet.HEART_BEAT_SEQNO) {  // a heartbeat ack
+                continue;
+              }
 
-              if (one == null) {
-                throw new IOException("Panic: responder did not receive " +
-                    "an ack for a packet: " + seqno);
+              // a success ack for a data packet
+              Packet one = null;
+              synchronized (dataQueue) {
+                one = ackQueue.getFirst();
               }
-              
+              if (one.seqno != seqno) {
+                throw new IOException("Responseprocessor: Expecting seqno " +
+                                      " for block " + block +
+                                      one.seqno + " but received " + seqno);
+              }
+              isLastPacketInBlock = one.lastPacketInBlock;
               // update bytesAcked
               block.setNumBytes(one.getLastByteOffsetBlock());
 

Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=890664&r1=890663&r2=890664&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
(original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
Tue Dec 15 06:27:19 2009
@@ -458,7 +458,7 @@
   public static class PipelineAck implements Writable {
     private long seqno;
     private Status replies[];
-    final public static PipelineAck HEART_BEAT = new PipelineAck(-1, new Status[0]);
+    public final static long UNKOWN_SEQNO = -2;
 
     /** default constructor **/
     public PipelineAck() {
@@ -495,6 +495,10 @@
      * @return the the ith reply
      */
     public Status getReply(int i) {
+      if (i<0 || i>=replies.length) {
+        throw new IllegalArgumentException("The input parameter " + i + 
+            " should in the range of [0, " + replies.length);
+      }
       return replies[i];
     }
     

Propchange: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 15 06:27:19 2009
@@ -1,5 +1,5 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/RecoveryInProgressException.java:713112
 /hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/RecoveryInProgressException.java:776175-785643,785929-786278
 /hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/RecoveryInProgressException.java:817353-818319,818321-818553
-/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java:817853-817863,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002
+/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java:817853-817863,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002,890655
 /hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/RecoveryInProgressException.java:796829-800617,800619-803337,804756-805652,808672-809439,811495-813103,813105-813630,814223-815964,818294-818298

Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=890664&r1=890663&r2=890664&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
(original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
Tue Dec 15 06:27:19 2009
@@ -560,7 +560,7 @@
       throttler.throttle(len);
     }
     
-    return len;
+    return lastPacketInBlock?-1:len;
   }
 
   void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
@@ -589,9 +589,9 @@
       }
 
       /* 
-       * Receive until packet has zero bytes of data.
+       * Receive until the last packet.
        */
-      while (receivePacket() > 0) {}
+      while (receivePacket() >= 0) {}
 
       // wait for all outstanding packet responses. And then
       // indicate responder to gracefully shutdown.
@@ -775,118 +775,11 @@
       notifyAll();
     }
 
-    private synchronized void lastDataNodeRun() {
-      long lastHeartbeat = System.currentTimeMillis();
-      boolean lastPacket = false;
-      final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
-
-      while (running && datanode.shouldRun && !lastPacket) {
-        long now = System.currentTimeMillis();
-        try {
-
-            // wait for a packet to be sent to downstream datanode
-            while (running && datanode.shouldRun && ackQueue.size() == 0)
{
-              long idle = now - lastHeartbeat;
-              long timeout = (datanode.socketTimeout/2) - idle;
-              if (timeout <= 0) {
-                timeout = 1000;
-              }
-              try {
-                wait(timeout);
-              } catch (InterruptedException e) {
-                if (running) {
-                  LOG.info("PacketResponder " + numTargets +
-                           " for block " + block + " Interrupted.");
-                  running = false;
-                }
-                break;
-              }
-          
-              // send a heartbeat if it is time.
-              now = System.currentTimeMillis();
-              if (now - lastHeartbeat > datanode.socketTimeout/2) {
-                PipelineAck.HEART_BEAT.write(replyOut);  // send heart beat
-                replyOut.flush();
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("PacketResponder " + numTargets +
-                            " for block " + block + 
-                            " sent a heartbeat");
-                }
-                lastHeartbeat = now;
-              }
-            }
-
-            if (!running || !datanode.shouldRun) {
-              break;
-            }
-            Packet pkt = ackQueue.getFirst();
-            long expected = pkt.seqno;
-            LOG.debug("PacketResponder " + numTargets +
-                      " for block " + block + 
-                      " acking for packet " + expected);
-
-            // If this is the last packet in block, then close block
-            // file and finalize the block before responding success
-            if (pkt.lastPacketInBlock) {
-              receiver.close();
-              final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
-              block.setNumBytes(replicaInfo.getNumBytes());
-              datanode.data.finalizeBlock(block);
-              datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
-              if (ClientTraceLog.isInfoEnabled() &&
-                  receiver.clientName.length() > 0) {
-                long offset = 0;
-                ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
-                    receiver.inAddr, receiver.myAddr, block.getNumBytes(),
-                    "HDFS_WRITE", receiver.clientName, offset,
-                    datanode.dnRegistration.getStorageID(), block, endTime-startTime));
-              } else {
-                LOG.info("Received block " + block + 
-                    " of size " + block.getNumBytes() + 
-                    " from " + receiver.inAddr);
-              }
-              lastPacket = true;
-            }
-
-            new PipelineAck(expected, new Status[]{SUCCESS}).write(replyOut);
-            replyOut.flush();
-            // remove the packet from the ack queue
-            removeAckHead();
-            // update the bytes acked
-            if (pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
-              replicaInfo.setBytesAcked(pkt.lastByteInBlock);
-            }
-        } catch (Exception e) {
-          LOG.warn("IOException in BlockReceiver.lastNodeRun: ", e);
-          if (running) {
-            try {
-              datanode.checkDiskError(e); // may throw an exception here
-            } catch (IOException ioe) {
-              LOG.warn("DataNode.chekDiskError failed in lastDataNodeRun with: ",
-                  ioe);
-            }
-            LOG.info("PacketResponder " + block + " " + numTargets + 
-                     " Exception " + StringUtils.stringifyException(e));
-            running = false;
-          }
-        }
-      }
-      LOG.info("PacketResponder " + numTargets + 
-               " for block " + block + " terminating");
-    }
-
     /**
      * Thread to process incoming acks.
      * @see java.lang.Runnable#run()
      */
     public void run() {
-
-      // If this is the last datanode in pipeline, then handle differently
-      if (numTargets == 0) {
-        lastDataNodeRun();
-        return;
-      }
-
       boolean lastPacketInBlock = false;
       final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
       while (running && datanode.shouldRun && !lastPacketInBlock) {
@@ -897,19 +790,18 @@
             Packet pkt = null;
             long expected = -2;
             PipelineAck ack = new PipelineAck();
-            try { 
-              // read an ack from downstream datanode
-              ack.readFields(mirrorIn);
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("PacketResponder " + numTargets + " got " + ack);
+            long seqno = PipelineAck.UNKOWN_SEQNO;
+            try {
+              if (numTargets != 0) {// not the last DN
+                // read an ack from downstream datanode
+                ack.readFields(mirrorIn);
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("PacketResponder " + numTargets + " got " + ack);
+                }
+                seqno = ack.getSeqno();
+                didRead = true;
               }
-              long seqno = ack.getSeqno();
-              didRead = true;
-              if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
-                ack.write(replyOut);
-                replyOut.flush();
-                continue;
-              } else if (seqno >= 0) {
+              if (seqno != PipelineAck.UNKOWN_SEQNO || numTargets == 0) {
                 synchronized (this) {
                   while (running && datanode.shouldRun && ackQueue.size()
== 0) {
                     if (LOG.isDebugEnabled()) {
@@ -925,9 +817,12 @@
                       throw e;
                     }
                   }
+                  if (!running || !datanode.shouldRun) {
+                    break;
+                  }
                   pkt = ackQueue.getFirst();
                   expected = pkt.seqno;
-                  if (seqno != expected) {
+                  if (numTargets > 0 && seqno != expected) {
                     throw new IOException("PacketResponder " + numTargets +
                                           " for block " + block +
                                           " expected seqno:" + expected +
@@ -983,14 +878,15 @@
 
             // construct my ack message
             Status[] replies = null;
-            if (!didRead) { // no ack is read
+            if (!didRead && numTargets != 0) { // ack read error
               replies = new Status[2];
               replies[0] = SUCCESS;
               replies[1] = ERROR;
             } else {
-              replies = new Status[1+ack.getNumOfReplies()];
+              short ackLen = numTargets == 0 ? 0 : ack.getNumOfReplies();
+              replies = new Status[1+ackLen];
               replies[0] = SUCCESS;
-              for (int i=0; i<ack.getNumOfReplies(); i++) {
+              for (int i=0; i<ackLen; i++) {
                 replies[i+1] = ack.getReply(i);
               }
             }

Propchange: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 15 06:27:19 2009
@@ -3,4 +3,4 @@
 /hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java:776175-785643,785929-786278
 /hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:776175-785643,785929-786278
 /hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:796829-820463
-/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002
+/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002,890655

Propchange: hadoop/hdfs/branches/branch-0.21/src/test/aop/org/apache/hadoop/hdfs/protocol/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 15 06:27:19 2009
@@ -1 +1 @@
-/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/protocol:817853-817863,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002
+/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/protocol:817853-817863,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002,890655

Modified: hadoop/hdfs/branches/branch-0.21/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj?rev=890664&r1=890663&r2=890664&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
(original)
+++ hadoop/hdfs/branches/branch-0.21/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
Tue Dec 15 06:27:19 2009
@@ -99,12 +99,6 @@
     && args(acked) 
     && this(pr);
 
-  pointcut callSetBytesAckedLastDN(PacketResponder pr, long acked) : 
-    call (void ReplicaInPipelineInterface.setBytesAcked(long)) 
-    && withincode (void PacketResponder.lastDataNodeRun())
-    && args(acked) 
-    && this(pr);
-  
   after (PacketResponder pr, long acked) : callSetBytesAcked (pr, acked) {
     PipelineTest pTest = DataTransferTestUtil.getDataTransferTest();
     if (pTest == null) {
@@ -117,19 +111,7 @@
       bytesAckedService((PipelinesTest)pTest, pr, acked);
     }
   }
-  after (PacketResponder pr, long acked) : callSetBytesAckedLastDN (pr, acked) {
-    PipelineTest pTest = DataTransferTestUtil.getDataTransferTest();
-    if (pTest == null) {
-      LOG.debug("FI: no pipeline has been found in acking");
-      return;
-    }
-    LOG.debug("FI: Acked total bytes from (last DN): " + 
-        pr.receiver.datanode.dnRegistration.getStorageID() + ": " + acked);
-    if (pTest instanceof PipelinesTest) {
-      bytesAckedService((PipelinesTest)pTest, pr, acked); 
-    }
-  }
-  
+
   private void bytesAckedService 
       (final PipelinesTest pTest, final PacketResponder pr, final long acked) {
     NodeBytes nb = new NodeBytes(pr.receiver.datanode.dnRegistration, acked);

Propchange: hadoop/hdfs/branches/branch-0.21/src/test/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 15 06:27:19 2009
@@ -1,4 +1,4 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/test/hdfs:713112
 /hadoop/core/trunk/src/test/hdfs:776175-785643
 /hadoop/hdfs/branches/HDFS-265/src/test/hdfs:796829-820463
-/hadoop/hdfs/trunk/src/test/hdfs:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002
+/hadoop/hdfs/trunk/src/test/hdfs:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002,890655

Modified: hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java?rev=890664&r1=890663&r2=890664&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java
(original)
+++ hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java
Tue Dec 15 06:27:19 2009
@@ -17,10 +17,14 @@
  */
 package org.apache.hadoop.hdfs;
 
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.log4j.Level;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import org.junit.Test;
@@ -30,6 +34,11 @@
 /** Class contains a set of tests to verify the correctness of 
  * newly introduced {@link FSDataOutputStream#hflush()} method */
 public class TestHFlush {
+  {
+    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+  }
+  
   private final String fName = "hflushtest.dat";
   
   /** The test uses {@link #doTheJob(Configuration, String, long, short)
@@ -143,4 +152,55 @@
       actual[idx] = 0;
     }
   }
+  
+  /** This creates a slow writer and check to see 
+   * if pipeline heartbeats work fine
+   */
+ @Test
+  public void testPipelineHeartbeat() throws Exception {
+    final int DATANODE_NUM = 2;
+    final int fileLen = 6;
+    Configuration conf = new HdfsConfiguration();
+    final int timeout = 2000;
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 
+        timeout);
+
+    final Path p = new Path("/pipelineHeartbeat/foo");
+    System.out.println("p=" + p);
+    
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, DATANODE_NUM, true, null);
+    DistributedFileSystem fs = (DistributedFileSystem)cluster.getFileSystem();
+
+    byte[] fileContents = AppendTestUtil.initBuffer(fileLen);
+
+    // create a new file.
+    FSDataOutputStream stm = AppendTestUtil.createFile(fs, p, DATANODE_NUM);
+
+    stm.write(fileContents, 0, 1);
+    Thread.sleep(timeout);
+    stm.hflush();
+    System.out.println("Wrote 1 byte and hflush " + p);
+
+    // write another byte
+    Thread.sleep(timeout);
+    stm.write(fileContents, 1, 1);
+    stm.hflush();
+    
+    stm.write(fileContents, 2, 1);
+    Thread.sleep(timeout);
+    stm.hflush();
+    
+    stm.write(fileContents, 3, 1);
+    Thread.sleep(timeout);
+    stm.write(fileContents, 4, 1);
+    stm.hflush();
+    
+    stm.write(fileContents, 5, 1);
+    Thread.sleep(timeout);
+    stm.close();
+
+    // verify that entire file is good
+    AppendTestUtil.checkFullFile(fs, p, fileLen,
+        fileContents, "Failed to slowly write to a file");
+  }
 }

Propchange: hadoop/hdfs/branches/branch-0.21/src/webapps/datanode/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 15 06:27:19 2009
@@ -1,4 +1,4 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/webapps/datanode:713112
 /hadoop/core/trunk/src/webapps/datanode:776175-784663
 /hadoop/hdfs/branches/HDFS-265/src/webapps/datanode:796829-820463
-/hadoop/hdfs/trunk/src/webapps/datanode:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002
+/hadoop/hdfs/trunk/src/webapps/datanode:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002,890655

Propchange: hadoop/hdfs/branches/branch-0.21/src/webapps/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 15 06:27:19 2009
@@ -1,4 +1,4 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/webapps/hdfs:713112
 /hadoop/core/trunk/src/webapps/hdfs:776175-784663
 /hadoop/hdfs/branches/HDFS-265/src/webapps/hdfs:796829-820463
-/hadoop/hdfs/trunk/src/webapps/hdfs:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002
+/hadoop/hdfs/trunk/src/webapps/hdfs:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002,890655

Propchange: hadoop/hdfs/branches/branch-0.21/src/webapps/secondary/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 15 06:27:19 2009
@@ -1,4 +1,4 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/webapps/secondary:713112
 /hadoop/core/trunk/src/webapps/secondary:776175-784663
 /hadoop/hdfs/branches/HDFS-265/src/webapps/secondary:796829-820463
-/hadoop/hdfs/trunk/src/webapps/secondary:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002
+/hadoop/hdfs/trunk/src/webapps/secondary:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002,890655



Mime
View raw message