hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hair...@apache.org
Subject svn commit: r892993 - in /hadoop/hdfs/trunk: CHANGES.txt src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
Date Mon, 21 Dec 2009 21:41:05 GMT
Author: hairong
Date: Mon Dec 21 21:41:05 2009
New Revision: 892993

URL: http://svn.apache.org/viewvc?rev=892993&view=rev
Log:
HDFS-101. DFS write pipeline: DFSClient sometimes does not detect second datanode failure.
Contributed by Hairong Kuang.

Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=892993&r1=892992&r2=892993&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Mon Dec 21 21:41:05 2009
@@ -573,6 +573,9 @@
 
     HDFS-456. Fix URI generation for windows file paths. (shv)
 
+    HDFS-724. Pipeline hangs if one of the block receiver is not responsive.
+    (hairong)
+
 Release 0.20.2 - Unreleased
 
   IMPROVEMENTS
@@ -609,8 +612,8 @@
     HDFS-185. Disallow chown, chgrp, chmod, setQuota, and setSpaceQuota when
     name-node is in safemode. (Ravi Phulari via shv)
 
-    HDFS-724. Pipeline hangs if one of the block receiver is not responsive.
-    (hairong)
+    HDFS-101. DFS write pipeline: DFSClient sometimes does not detect second
+    datanode failure. (hairong)
 
 Release 0.20.1 - 2009-09-01
 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=892993&r1=892992&r2=892993&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Mon
Dec 21 21:41:05 2009
@@ -77,6 +77,7 @@
   private Checksum partialCrc = null;
   private final DataNode datanode;
   final private ReplicaInPipelineInterface replicaInfo;
+  volatile private boolean mirrorError;
 
   BlockReceiver(Block block, DataInputStream in, String inAddr,
                 String myAddr, BlockConstructionStage stage, 
@@ -217,21 +218,19 @@
 
   /**
    * While writing to mirrorOut, failure to write to mirror should not
-   * affect this datanode unless a client is writing the block.
+   * affect this datanode unless it is caused by interruption.
    */
   private void handleMirrorOutError(IOException ioe) throws IOException {
     LOG.info(datanode.dnRegistration + ":Exception writing block " +
              block + " to mirror " + mirrorAddr + "\n" +
              StringUtils.stringifyException(ioe));
-    mirrorOut = null;
-    //
-    // If stream-copy fails, continue
-    // writing to disk for replication requests. For client
-    // writes, return error so that the client can do error
-    // recovery.
-    //
-    if (clientName.length() > 0) {
+    if (Thread.interrupted()) { // shut down if the thread is interrupted
       throw ioe;
+    } else { // encounter an error while writing to mirror
+      // continue to run even if can not write to mirror
+      // notify client of the error
+      // and wait for the client to shut down the pipeline
+      mirrorError = true;
     }
   }
   
@@ -461,7 +460,7 @@
     }  
 
     //First write the packet to the mirror:
-    if (mirrorOut != null) {
+    if (mirrorOut != null && !mirrorError) {
       try {
         mirrorOut.write(buf.array(), buf.position(), buf.remaining());
         mirrorOut.flush();
@@ -469,7 +468,7 @@
         handleMirrorOutError(e);
       }
     }
-
+    
     buf.position(endOfHeader);        
     
     if (lastPacketInBlock || len == 0) {
@@ -584,7 +583,8 @@
       if (clientName.length() > 0) {
         responder = new Daemon(datanode.threadGroup, 
                                new PacketResponder(this, block, mirrIn, 
-                                                   replyOut, numTargets));
+                                                   replyOut, numTargets,
+                                                   Thread.currentThread()));
         responder.start(); // start thread to processes reponses
       }
 
@@ -729,13 +729,16 @@
     DataOutputStream replyOut;  // output to upstream datanode
     private int numTargets;     // number of downstream datanodes including myself
     private BlockReceiver receiver; // The owner of this responder.
+    private Thread receiverThread; // the thread that spawns this responder
 
     public String toString() {
       return "PacketResponder " + numTargets + " for Block " + this.block;
     }
 
     PacketResponder(BlockReceiver receiver, Block b, DataInputStream in, 
-                    DataOutputStream out, int numTargets) {
+                    DataOutputStream out, int numTargets,
+                    Thread receiverThread) {
+      this.receiverThread = receiverThread;
       this.receiver = receiver;
       this.block = b;
       mirrorIn = in;
@@ -786,20 +789,18 @@
 
         boolean isInterrupted = false;
         try {
-            boolean didRead = false;
             Packet pkt = null;
             long expected = -2;
             PipelineAck ack = new PipelineAck();
             long seqno = PipelineAck.UNKOWN_SEQNO;
             try {
-              if (numTargets != 0) {// not the last DN
+              if (numTargets != 0 && !mirrorError) {// not the last DN & no mirror
error
                 // read an ack from downstream datanode
                 ack.readFields(mirrorIn);
                 if (LOG.isDebugEnabled()) {
                   LOG.debug("PacketResponder " + numTargets + " got " + ack);
                 }
                 seqno = ack.getSeqno();
-                didRead = true;
               }
               if (seqno != PipelineAck.UNKOWN_SEQNO || numTargets == 0) {
                 synchronized (this) {
@@ -810,12 +811,7 @@
                                 " for block " + block +
                                 " waiting for local datanode to finish write.");
                     }
-                    try {
-                      wait();
-                    } catch (InterruptedException e) {
-                      isInterrupted = true;
-                      throw e;
-                    }
+                    wait();
                   }
                   if (!running || !datanode.shouldRun) {
                     break;
@@ -831,11 +827,18 @@
                   lastPacketInBlock = pkt.lastPacketInBlock;
                 }
               }
-            } catch (Throwable e) {
-              if (running) {
+            } catch (InterruptedException ine) {
+              isInterrupted = true;
+            } catch (IOException ioe) {
+              if (Thread.interrupted()) {
+                isInterrupted = true;
+              } else {
+                // continue to run even if can not read from mirror
+                // notify client of the error
+                // and wait for the client to shut down the pipeline
+                mirrorError = true;
                 LOG.info("PacketResponder " + block + " " + numTargets + 
-                         " Exception " + StringUtils.stringifyException(e));
-                running = false;
+                      " Exception " + StringUtils.stringifyException(ioe));
               }
             }
 
@@ -845,8 +848,7 @@
                * receiver thread (e.g. if it is ok to write to replyOut). 
                * It is prudent to not send any more status back to the client
                * because this datanode has a problem. The upstream datanode
-               * will detect a timout on heartbeats and will declare that
-               * this datanode is bad, and rightly so.
+               * will detect that this datanode is bad, and rightly so.
                */
               LOG.info("PacketResponder " + block +  " " + numTargets +
                        " : Thread is interrupted.");
@@ -878,7 +880,7 @@
 
             // construct my ack message
             Status[] replies = null;
-            if (!didRead && numTargets != 0) { // ack read error
+            if (mirrorError) { // ack read error
               replies = new Status[2];
               replies[0] = SUCCESS;
               replies[1] = ERROR;
@@ -909,12 +911,6 @@
                 replicaInfo.setBytesAcked(pkt.lastByteInBlock);
               }
             }
-            // If we forwarded an error response from a downstream datanode
-            // and we are acting on behalf of a client, then we quit. The 
-            // client will drive the recovery mechanism.
-            if (!replyAck.isSuccess() && receiver.clientName.length() > 0) {
-              running = false;
-            }
         } catch (IOException e) {
           LOG.warn("IOException in BlockReceiver.run(): ", e);
           if (running) {
@@ -926,12 +922,16 @@
             LOG.info("PacketResponder " + block + " " + numTargets + 
                      " Exception " + StringUtils.stringifyException(e));
             running = false;
+            if (!Thread.interrupted()) { // failure not caused by interruption
+              receiverThread.interrupt();
+            }
           }
-        } catch (RuntimeException e) {
+        } catch (Throwable e) {
           if (running) {
             LOG.info("PacketResponder " + block + " " + numTargets + 
                      " Exception " + StringUtils.stringifyException(e));
             running = false;
+            receiverThread.interrupt();
           }
         }
       }



Mime
View raw message