hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kih...@apache.org
Subject svn commit: r1484809 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/im...
Date Tue, 21 May 2013 13:45:14 GMT
Author: kihwal
Date: Tue May 21 13:45:14 2013
New Revision: 1484809

URL: http://svn.apache.org/r1484809
Log:
HDFS-3875. Issue handling checksum errors in write pipeline. Contributed by Kihwal Lee.

Added:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
  (with props)
Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestCrcCorruption.java

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1484809&r1=1484808&r2=1484809&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue May 21
13:45:14 2013
@@ -2406,6 +2406,8 @@ Release 0.23.8 - UNRELEASED
     HDFS-4805. Webhdfs client is fragile to token renewal errors 
     (daryn via kihwal)
 
+    HDFS-3875. Issue handling checksum errors in write pipeline. (kihwal)
+
 Release 0.23.7 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java?rev=1484809&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
(added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
Tue May 21 13:45:14 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+import java.io.IOException;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Used for injecting faults in DFSClient and DFSOutputStream tests.
+ * Calls into this are a no-op in production code. 
+ */
+@VisibleForTesting
+@InterfaceAudience.Private
+public class DFSClientFaultInjector {
+  public static DFSClientFaultInjector instance = new DFSClientFaultInjector();
+
+  public static DFSClientFaultInjector get() {
+    return instance;
+  }
+
+  public boolean corruptPacket() {
+    return false;
+  }
+
+  public boolean uncorruptPacket() {
+    return false;
+  }
+}

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1484809&r1=1484808&r2=1484809&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
Tue May 21 13:45:14 2013
@@ -262,8 +262,18 @@ public class DFSOutputStream extends FSO
       System.arraycopy(header.getBytes(), 0, buf, headerStart,
           header.getSerializedSize());
       
+      // corrupt the data for testing.
+      if (DFSClientFaultInjector.get().corruptPacket()) {
+        buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
+      }
+
       // Write the now contiguous full packet to the output stream.
       stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen);
+
+      // undo corruption.
+      if (DFSClientFaultInjector.get().uncorruptPacket()) {
+        buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
+      }
     }
     
     // get the packet's last byte's offset in the block
@@ -331,6 +341,9 @@ public class DFSOutputStream extends FSO
 
     /** Nodes have been used in the pipeline before and have failed. */
     private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();
+    /** The last ack sequence number before pipeline failure. */
+    private long lastAckedSeqnoBeforeFailure = -1;
+    private int pipelineRecoveryCount = 0;
     /** Has the current block been hflushed? */
     private boolean isHflushed = false;
     /** Append on an existing block? */
@@ -783,6 +796,23 @@ public class DFSOutputStream extends FSO
         ackQueue.clear();
       }
 
+      // Record the new pipeline failure recovery.
+      if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) {
+         lastAckedSeqnoBeforeFailure = lastAckedSeqno;
+         pipelineRecoveryCount = 1;
+      } else {
+        // If we had to recover the pipeline five times in a row for the
+        // same packet, this client likely has corrupt data or corrupting
+        // during transmission.
+        if (++pipelineRecoveryCount > 5) {
+          DFSClient.LOG.warn("Error recovering pipeline for writing " +
+              block + ". Already retried 5 times for the same packet.");
+          lastException = new IOException("Failing write. Tried pipeline " +
+              "recovery 5 times without success.");
+          streamerClosed = true;
+          return false;
+        }
+      }
       boolean doSleep = setupPipelineForAppendOrRecovery();
       
       if (!streamerClosed && dfsClient.clientRunning) {

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1484809&r1=1484808&r2=1484809&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
Tue May 21 13:45:14 2013
@@ -377,7 +377,8 @@ class BlockReceiver implements Closeable
       clientChecksum.verifyChunkedSums(dataBuf, checksumBuf, clientname, 0);
     } catch (ChecksumException ce) {
       LOG.warn("Checksum error in block " + block + " from " + inAddr, ce);
-      if (srcDataNode != null) {
+      // No need to report to namenode when client is writing.
+      if (srcDataNode != null && isDatanode) {
         try {
           LOG.info("report corrupt " + block + " from datanode " +
                     srcDataNode + " to namenode");
@@ -404,6 +405,19 @@ class BlockReceiver implements Closeable
     diskChecksum.calculateChunkedSums(dataBuf, checksumBuf);
   }
 
+  /** 
+   * Check whether checksum needs to be verified.
+   * Skip verifying checksum iff this is not the last one in the 
+   * pipeline and clientName is non-null. i.e. Checksum is verified
+   * on all the datanodes when the data is being written by a 
+   * datanode rather than a client. Whe client is writing the data, 
+   * protocol includes acks and only the last datanode needs to verify 
+   * checksum.
+   * @return true if checksum verification is needed, otherwise false.
+   */
+  private boolean shouldVerifyChecksum() {
+    return (mirrorOut == null || isDatanode || needsChecksumTranslation);
+  }
 
   /** 
    * Receives and processes a packet. It can contain many chunks.
@@ -451,9 +465,9 @@ class BlockReceiver implements Closeable
     }
     
     // put in queue for pending acks, unless sync was requested
-    if (responder != null && !syncBlock) {
+    if (responder != null && !syncBlock && !shouldVerifyChecksum()) {
       ((PacketResponder) responder.getRunnable()).enqueue(seqno,
-          lastPacketInBlock, offsetInBlock);
+          lastPacketInBlock, offsetInBlock, Status.SUCCESS);
     }
 
     //First write the packet to the mirror:
@@ -485,17 +499,26 @@ class BlockReceiver implements Closeable
         throw new IOException("Length of checksums in packet " +
             checksumBuf.capacity() + " does not match calculated checksum " +
             "length " + checksumLen);
-     }
+      }
 
-      /* skip verifying checksum iff this is not the last one in the 
-       * pipeline and clientName is non-null. i.e. Checksum is verified
-       * on all the datanodes when the data is being written by a 
-       * datanode rather than a client. Whe client is writing the data, 
-       * protocol includes acks and only the last datanode needs to verify 
-       * checksum.
-       */
-      if (mirrorOut == null || isDatanode || needsChecksumTranslation) {
-        verifyChunks(dataBuf, checksumBuf);
+      if (shouldVerifyChecksum()) {
+        try {
+          verifyChunks(dataBuf, checksumBuf);
+        } catch (IOException ioe) {
+          // checksum error detected locally. there is no reason to continue.
+          if (responder != null) {
+            try {
+              ((PacketResponder) responder.getRunnable()).enqueue(seqno,
+                  lastPacketInBlock, offsetInBlock,
+                  Status.ERROR_CHECKSUM);
+              // Wait until the responder sends back the response
+              // and interrupt this thread.
+              Thread.sleep(3000);
+            } catch (InterruptedException e) { }
+          }
+          throw new IOException("Terminating due to a checksum error." + ioe);
+        }
+ 
         if (needsChecksumTranslation) {
           // overwrite the checksums in the packet buffer with the
           // appropriate polynomial for the disk storage.
@@ -584,9 +607,9 @@ class BlockReceiver implements Closeable
 
     // if sync was requested, put in queue for pending acks here
     // (after the fsync finished)
-    if (responder != null && syncBlock) {
+    if (responder != null && (syncBlock || shouldVerifyChecksum())) {
       ((PacketResponder) responder.getRunnable()).enqueue(seqno,
-          lastPacketInBlock, offsetInBlock);
+          lastPacketInBlock, offsetInBlock, Status.SUCCESS);
     }
 
     if (throttler != null) { // throttle I/O
@@ -784,7 +807,7 @@ class BlockReceiver implements Closeable
   }
   
   /**
-   * Processed responses from downstream datanodes in the pipeline
+   * Processes responses from downstream datanodes in the pipeline
    * and sends back replies to the originator.
    */
   class PacketResponder implements Runnable, Closeable {   
@@ -837,10 +860,11 @@ class BlockReceiver implements Closeable
      * @param offsetInBlock
      */
     synchronized void enqueue(final long seqno,
-        final boolean lastPacketInBlock, final long offsetInBlock) {
+        final boolean lastPacketInBlock,
+        final long offsetInBlock, final Status ackStatus) {
       if (running) {
         final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock,
-            System.nanoTime());
+            System.nanoTime(), ackStatus);
         if(LOG.isDebugEnabled()) {
           LOG.debug(myString + ": enqueue " + p);
         }
@@ -986,20 +1010,31 @@ class BlockReceiver implements Closeable
               }
             }
 
+            Status myStatus = pkt == null ? Status.SUCCESS : pkt.ackStatus;
             // construct my ack message
             Status[] replies = null;
             if (mirrorError) { // ack read error
               replies = new Status[2];
-              replies[0] = Status.SUCCESS;
+              replies[0] = myStatus;
               replies[1] = Status.ERROR;
             } else {
               short ackLen = type == PacketResponderType.LAST_IN_PIPELINE? 0
                   : ack.getNumOfReplies();
               replies = new Status[1+ackLen];
-              replies[0] = Status.SUCCESS;
+              replies[0] = myStatus;
               for (int i=0; i<ackLen; i++) {
                 replies[i+1] = ack.getReply(i);
               }
+              // If the mirror has reported that it received a corrupt packet,
+              // do self-destruct to mark myself bad, instead of the mirror node.
+              // The mirror is guaranteed to be good without corrupt data.
+              if (ackLen > 0 && replies[1] == Status.ERROR_CHECKSUM) {
+                running = false;
+                removeAckHead();
+                LOG.warn("Shutting down writer and responder due to a checksum error.");
+                receiverThread.interrupt();
+                continue;
+              }
             }
             PipelineAck replyAck = new PipelineAck(expected, replies, totalAckTimeNanos);
             
@@ -1018,6 +1053,14 @@ class BlockReceiver implements Closeable
               removeAckHead();
               // update bytes acked
             }
+            // terminate after sending response if this node detected 
+            // a checksum error
+            if (myStatus == Status.ERROR_CHECKSUM) {
+              running = false;
+              LOG.warn("Shutting down writer and responder due to a checksum error.");
+              receiverThread.interrupt();
+              continue;
+            }
         } catch (IOException e) {
           LOG.warn("IOException in BlockReceiver.run(): ", e);
           if (running) {
@@ -1062,13 +1105,15 @@ class BlockReceiver implements Closeable
     final boolean lastPacketInBlock;
     final long offsetInBlock;
     final long ackEnqueueNanoTime;
+    final Status ackStatus;
 
     Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock,
-        long ackEnqueueNanoTime) {
+        long ackEnqueueNanoTime, Status ackStatus) {
       this.seqno = seqno;
       this.lastPacketInBlock = lastPacketInBlock;
       this.offsetInBlock = offsetInBlock;
       this.ackEnqueueNanoTime = ackEnqueueNanoTime;
+      this.ackStatus = ackStatus;
     }
 
     @Override
@@ -1077,6 +1122,7 @@ class BlockReceiver implements Closeable
         + ", lastPacketInBlock=" + lastPacketInBlock
         + ", offsetInBlock=" + offsetInBlock
         + ", ackEnqueueNanoTime=" + ackEnqueueNanoTime
+        + ", ackStatus=" + ackStatus
         + ")";
     }
   }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1484809&r1=1484808&r2=1484809&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
Tue May 21 13:45:14 2013
@@ -730,13 +730,25 @@ class FsDatasetImpl implements FsDataset
     }
     
     // check replica length
-    if (rbw.getBytesAcked() < minBytesRcvd || rbw.getNumBytes() > maxBytesRcvd){
+    long bytesAcked = rbw.getBytesAcked();
+    long numBytes = rbw.getNumBytes();
+    if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd){
       throw new ReplicaNotFoundException("Unmatched length replica " + 
-          replicaInfo + ": BytesAcked = " + rbw.getBytesAcked() + 
-          " BytesRcvd = " + rbw.getNumBytes() + " are not in the range of [" + 
+          replicaInfo + ": BytesAcked = " + bytesAcked + 
+          " BytesRcvd = " + numBytes + " are not in the range of [" + 
           minBytesRcvd + ", " + maxBytesRcvd + "].");
     }
 
+    // Truncate the potentially corrupt portion.
+    // If the source was client and the last node in the pipeline was lost,
+    // any corrupt data written after the acked length can go unnoticed. 
+    if (numBytes > bytesAcked) {
+      final File replicafile = rbw.getBlockFile();
+      truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked);
+      rbw.setNumBytes(bytesAcked);
+      rbw.setLastChecksumAndDataLen(bytesAcked, null);
+    }
+
     // bump the replica's generation stamp to newGS
     bumpReplicaGS(rbw, newGS);
     

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestCrcCorruption.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestCrcCorruption.java?rev=1484809&r1=1484808&r2=1484809&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestCrcCorruption.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestCrcCorruption.java
Tue May 21 13:45:14 2013
@@ -31,11 +31,18 @@ import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClientFaultInjector;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.io.IOUtils;
+import org.junit.Before;
 import org.junit.Test;
 
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+
 /**
  * A JUnit test for corrupted file handling.
  * This test creates a bunch of files/directories with replication 
@@ -64,6 +71,79 @@ import org.junit.Test;
  *     replica was created from the non-corrupted replica.
  */
 public class TestCrcCorruption {
+
+  private DFSClientFaultInjector faultInjector;
+
+  @Before
+  public void setUp() throws IOException {
+    faultInjector = Mockito.mock(DFSClientFaultInjector.class);
+    DFSClientFaultInjector.instance = faultInjector;
+  }
+
+  /** 
+   * Test case for data corruption during data transmission for
+   * create/write. To recover from corruption while writing, at
+   * least two replicas are needed.
+   */
+  @Test(timeout=50000)
+  public void testCorruptionDuringWrt() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = null;
+
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(10).build();
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+      Path file = new Path("/test_corruption_file");
+      FSDataOutputStream out = fs.create(file, true, 8192, (short)3, (long)(128*1024*1024));
+      byte[] data = new byte[65536];
+      for (int i=0; i < 65536; i++) {
+        data[i] = (byte)(i % 256);
+      }
+
+      for (int i = 0; i < 5; i++) {
+        out.write(data, 0, 65535);
+      }
+      out.hflush();
+      // corrupt the packet once
+      Mockito.when(faultInjector.corruptPacket()).thenReturn(true, false);
+      Mockito.when(faultInjector.uncorruptPacket()).thenReturn(true, false);
+
+      for (int i = 0; i < 5; i++) {
+        out.write(data, 0, 65535);
+      }
+      out.close();
+      // read should succeed
+      FSDataInputStream in = fs.open(file);
+      for(int c; (c = in.read()) != -1; );
+      in.close();
+
+      // test the retry limit
+      out = fs.create(file, true, 8192, (short)3, (long)(128*1024*1024));
+
+      // corrupt the packet once and never fix it.
+      Mockito.when(faultInjector.corruptPacket()).thenReturn(true, false);
+      Mockito.when(faultInjector.uncorruptPacket()).thenReturn(false);
+
+      // the client should give up pipeline reconstruction after retries.
+      try {
+        for (int i = 0; i < 5; i++) {
+          out.write(data, 0, 65535);
+        }
+        out.close();
+        fail("Write did not fail");
+      } catch (IOException ioe) {
+        // we should get an ioe
+        DFSClient.LOG.info("Got expected exception", ioe);
+      }
+    } finally {
+      if (cluster != null) { cluster.shutdown(); }
+      Mockito.when(faultInjector.corruptPacket()).thenReturn(false);
+      Mockito.when(faultInjector.uncorruptPacket()).thenReturn(false);
+    }
+  }
+
+
   /** 
    * check if DFS can handle corrupted CRC blocks
    */



Mime
View raw message