hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yjzhan...@apache.org
Subject hadoop git commit: HDFS-10652. Add a unit test for HDFS-4660. Contributed by Vinayakumar B., Wei-Chiu Chuang, Yongjun Zhang.
Date Sun, 28 Aug 2016 06:42:57 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.8 42e03d7bf -> 6f10a0a87


HDFS-10652. Add a unit test for HDFS-4660. Contributed by Vinayakumar B., Wei-Chiu Chuang,
Yongjun Zhang.

(cherry picked from commit c25817159af17753b398956cfe6ff14984801b01)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6f10a0a8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6f10a0a8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6f10a0a8

Branch: refs/heads/branch-2.8
Commit: 6f10a0a87b7c801c33f41c65b20f008894ddc320
Parents: 42e03d7
Author: Yongjun Zhang <yzhang@cloudera.com>
Authored: Sat Aug 27 22:46:53 2016 -0700
Committer: Yongjun Zhang <yzhang@cloudera.com>
Committed: Sat Aug 27 23:11:57 2016 -0700

----------------------------------------------------------------------
 .../hdfs/server/datanode/BlockReceiver.java     |   1 +
 .../hadoop/hdfs/server/datanode/DataNode.java   |   3 +-
 .../server/datanode/DataNodeFaultInjector.java  |   3 +
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |   2 +-
 .../TestClientProtocolForPipelineRecovery.java  | 138 ++++++++++++++++++-
 5 files changed, 144 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f10a0a8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index fc4b665..2ab2a0e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -1295,6 +1295,7 @@ class BlockReceiver implements Closeable {
           long ackRecvNanoTime = 0;
           try {
             if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) {
+              DataNodeFaultInjector.get().failPipeline(replicaInfo, mirrorAddr);
               // read an ack from downstream datanode
               ack.readFields(downstreamIn);
               ackRecvNanoTime = System.nanoTime();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f10a0a8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 7501b7e..4c38b89 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -2347,7 +2347,8 @@ public class DataNode extends ReconfigurableBase
         blockSender.sendBlock(out, unbufOut, null);
 
         // no response necessary
-        LOG.info(getClass().getSimpleName() + ": Transmitted " + b
+        LOG.info(getClass().getSimpleName() + ", at "
+            + DataNode.this.getDisplayName() + ": Transmitted " + b
             + " (numBytes=" + b.getNumBytes() + ") to " + curTarget);
 
         // read ack

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f10a0a8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
index 4ecbdc0..931c124 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
@@ -55,4 +55,7 @@ public class DataNodeFaultInjector {
   public void noRegistration() throws IOException { }
 
   public void failMirrorConnection() throws IOException { }
+
+  public void failPipeline(ReplicaInPipelineInterface replicaInfo,
+      String mirrorAddr) throws IOException { }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f10a0a8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index de5b603..88c0793 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1452,7 +1452,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           if (!rbw.attemptToSetWriter(null, Thread.currentThread())) {
             throw new MustStopExistingWriter(rbw);
           }
-          LOG.info("Recovering " + rbw);
+          LOG.info("At " + datanode.getDisplayName() + ", Recovering " + rbw);
           return recoverRbwImpl(rbw, b, newGS, minBytesRcvd, maxBytesRcvd);
         }
       } catch (MustStopExistingWriter e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f10a0a8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
index 5e320fa..99b617e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
@@ -17,9 +17,16 @@
  */
 package org.apache.hadoop.hdfs;
 
+import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.base.Supplier;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -31,6 +38,8 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
@@ -39,12 +48,15 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This tests pipeline recovery related client protocol works correct or not.
  */
 public class TestClientProtocolForPipelineRecovery {
-  
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestClientProtocolForPipelineRecovery.class);
   @Test public void testGetNewStamp() throws IOException {
     int numDataNodes = 1;
     Configuration conf = new HdfsConfiguration();
@@ -477,4 +489,128 @@ public class TestClientProtocolForPipelineRecovery {
       DataNodeFaultInjector.set(oldDnInjector);
     }
   }
+
+  // Test to verify that blocks are no longer corrupted after HDFS-4660.
+  // Revert HDFS-4660 and the other related ones (HDFS-9220, HDFS-8722), this
+  // test would fail.
+  // Scenario: Prior to the fix, block get corrupted when the transferBlock
+  // happens during pipeline recovery with extra bytes to make up the end of
+  // chunk.
+  // For verification, Need to fail the pipeline for last datanode when the
+  // second datanode have more bytes on disk than already acked bytes.
+  // This will enable to transfer extra bytes to the newNode to makeup
+  // end-of-chunk during pipeline recovery. This is achieved by the customized
+  // DataNodeFaultInjector class in this test.
+  // For detailed info, please refer to HDFS-4660 and HDFS-10587. HDFS-9220
+  // fixes an issue in HDFS-4660 patch, and HDFS-8722 is an optimization.
+  @Test
+  public void testPipelineRecoveryWithTransferBlock() throws Exception {
+    final int chunkSize = 512;
+    final int oneWriteSize = 5000;
+    final int totalSize = 1024 * 1024;
+    final int errorInjectionPos = 512;
+    Configuration conf = new HdfsConfiguration();
+    // Need 4 datanodes to verify the replaceDatanode during pipeline recovery
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
+    DataNodeFaultInjector old = DataNodeFaultInjector.get();
+
+    try {
+      DistributedFileSystem fs = cluster.getFileSystem();
+      Path fileName = new Path("/f");
+      FSDataOutputStream o = fs.create(fileName);
+      int count = 0;
+      // Flush to get the pipeline created.
+      o.writeBytes("hello");
+      o.hflush();
+      DFSOutputStream dfsO = (DFSOutputStream) o.getWrappedStream();
+      final DatanodeInfo[] pipeline = dfsO.getStreamer().getNodes();
+      final String lastDn = pipeline[2].getXferAddr(false);
+      final AtomicBoolean failed = new AtomicBoolean(false);
+
+      DataNodeFaultInjector.set(new DataNodeFaultInjector() {
+        @Override
+        public void failPipeline(ReplicaInPipelineInterface replicaInfo,
+            String mirror) throws IOException {
+          if (!lastDn.equals(mirror)) {
+            // Only fail for second DN
+            return;
+          }
+          if (!failed.get() &&
+              (replicaInfo.getBytesAcked() > errorInjectionPos) &&
+              (replicaInfo.getBytesAcked() % chunkSize != 0)) {
+            int count = 0;
+            while (count < 10) {
+              // Fail the pipeline (Throw exception) when:
+              //   1. bytsAcked is not at chunk boundary (checked in the if
+              //      statement above)
+              //   2. bytesOnDisk is bigger than bytesAcked and at least
+              //      reaches (or go beyond) the end of the chunk that
+              //      bytesAcked is in (checked in the if statement below).
+              // At this condition, transferBlock that happens during
+              // pipeline recovery would transfer extra bytes to make up to the
+              // end of the chunk. And this is when the block corruption
+              // described in HDFS-4660 would occur.
+              if ((replicaInfo.getBytesOnDisk() / chunkSize) -
+                  (replicaInfo.getBytesAcked() / chunkSize) >= 1) {
+                failed.set(true);
+                throw new IOException(
+                    "Failing Pipeline " + replicaInfo.getBytesAcked() + " : "
+                        + replicaInfo.getBytesOnDisk());
+              }
+              try {
+                Thread.sleep(200);
+              } catch (InterruptedException e) {
+              }
+              count++;
+            }
+          }
+        }
+      });
+
+      Random r = new Random();
+      byte[] b = new byte[oneWriteSize];
+      while (count < totalSize) {
+        r.nextBytes(b);
+        o.write(b);
+        count += oneWriteSize;
+        o.hflush();
+      }
+
+      assertTrue("Expected a failure in the pipeline", failed.get());
+      DatanodeInfo[] newNodes = dfsO.getStreamer().getNodes();
+      o.close();
+      // Trigger block report to NN
+      for (DataNode d: cluster.getDataNodes()) {
+        DataNodeTestUtils.triggerBlockReport(d);
+      }
+      // Read from the replaced datanode to verify the corruption. So shutdown
+      // all other nodes in the pipeline.
+      List<DatanodeInfo> pipelineList = Arrays.asList(pipeline);
+      DatanodeInfo newNode = null;
+      for (DatanodeInfo node : newNodes) {
+        if (!pipelineList.contains(node)) {
+          newNode = node;
+          break;
+        }
+      }
+      LOG.info("Number of nodes in pipeline: {} newNode {}",
+          newNodes.length, newNode.getName());
+      // shutdown old 2 nodes
+      for (int i = 0; i < newNodes.length; i++) {
+        if (newNodes[i].getName().equals(newNode.getName())) {
+          continue;
+        }
+        LOG.info("shutdown {}", newNodes[i].getName());
+        cluster.stopDataNode(newNodes[i].getName());
+      }
+
+      // Read should be successfull from only the newNode. There should not be
+      // any corruption reported.
+      DFSTestUtil.readFile(fs, fileName);
+    } finally {
+      DataNodeFaultInjector.set(old);
+      cluster.shutdown();
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message