hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kih...@apache.org
Subject hadoop git commit: HDFS-7704. DN heartbeat to Active NN may be blocked and expire if connection to Standby NN continues to time out. Contributed by Rushabh Shah.
Date Thu, 12 Feb 2015 15:16:24 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk 7bc492ada -> 38262779b


HDFS-7704. DN heartbeat to Active NN may be blocked and expire if connection to Standby NN
continues to time out. Contributed by Rushabh Shah.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/38262779
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/38262779
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/38262779

Branch: refs/heads/trunk
Commit: 38262779bbf38a427bad6d044e91165567f1d206
Parents: 7bc492a
Author: Kihwal Lee <kihwal@apache.org>
Authored: Thu Feb 12 09:15:30 2015 -0600
Committer: Kihwal Lee <kihwal@apache.org>
Committed: Thu Feb 12 09:15:30 2015 -0600

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hdfs/server/datanode/BPOfferService.java    |   8 +-
 .../hdfs/server/datanode/BPServiceActor.java    |  61 +++----
 .../server/datanode/TestBPOfferService.java     | 157 +++++++++++++++++++
 4 files changed, 197 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/38262779/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 4c32a36..cd43bd2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -929,6 +929,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7771. fuse_dfs should permit FILE: on the front of KRB5CCNAME
     (cmccabe)
 
+    HDFS-7704. DN heartbeat to Active NN may be blocked and expire if
+    connection to Standby NN continues to time out (Rushabh Shah via kihwal)
+
 Release 2.6.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38262779/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index dfeacde..0289167 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -218,7 +218,9 @@ class BPOfferService {
                        String storageUuid, StorageType storageType) {
     checkBlock(block);
     for (BPServiceActor actor : bpServices) {
-      actor.reportBadBlocks(block, storageUuid, storageType);
+      ReportBadBlockAction rbbAction = new ReportBadBlockAction
+          (block, storageUuid, storageType);
+      actor.bpThreadEnqueue(rbbAction);
     }
   }
   
@@ -414,7 +416,9 @@ class BPOfferService {
    */
   void trySendErrorReport(int errCode, String errMsg) {
     for (BPServiceActor actor : bpServices) {
-      actor.trySendErrorReport(errCode, errMsg);
+      ErrorReportAction errorReportAction = new ErrorReportAction 
+          (errCode, errMsg);
+      actor.bpThreadEnqueue(errorReportAction);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38262779/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 917b5dd..69210db 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -25,6 +25,7 @@ import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -34,7 +35,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.client.BlockReportOptions;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -120,6 +120,8 @@ class BPServiceActor implements Runnable {
   private final DNConf dnConf;
 
   private DatanodeRegistration bpRegistration;
+  final LinkedList<BPServiceActorAction> bpThreadQueue 
+      = new LinkedList<BPServiceActorAction>();
 
   BPServiceActor(InetSocketAddress nnAddr, BPOfferService bpos) {
     this.bpos = bpos;
@@ -254,26 +256,6 @@ class BPServiceActor implements Runnable {
     resetBlockReportTime = true; // reset future BRs for randomness
   }
 
-  void reportBadBlocks(ExtendedBlock block,
-      String storageUuid, StorageType storageType) {
-    if (bpRegistration == null) {
-      return;
-    }
-    DatanodeInfo[] dnArr = { new DatanodeInfo(bpRegistration) };
-    String[] uuids = { storageUuid };
-    StorageType[] types = { storageType };
-    LocatedBlock[] blocks = { new LocatedBlock(block, dnArr, uuids, types) };
-    
-    try {
-      bpNamenode.reportBadBlocks(blocks);  
-    } catch (IOException e){
-      /* One common reason is that NameNode could be in safe mode.
-       * Should we keep on retrying in that case?
-       */
-      LOG.warn("Failed to report bad block " + block + " to namenode : "
-          + " Exception", e);
-    }
-  }
   
   /**
    * Report received blocks and delete hints to the Namenode for each
@@ -771,6 +753,7 @@ class BPServiceActor implements Runnable {
       } catch (IOException e) {
         LOG.warn("IOException in offerService", e);
       }
+      processQueueMessages();
     } // while (shouldRun())
   } // offerService
 
@@ -909,14 +892,6 @@ class BPServiceActor implements Runnable {
     return true;
   }
 
-  void trySendErrorReport(int errCode, String errMsg) {
-    try {
-      bpNamenode.errorReport(bpRegistration, errCode, errMsg);
-    } catch(IOException e) {
-      LOG.warn("Error reporting an error to NameNode " + nnAddr,
-          e);
-    }
-  }
 
   /**
    * Report a bad block from another DN in this cluster.
@@ -1018,4 +993,30 @@ class BPServiceActor implements Runnable {
       }
     }
   }
-}
+  
+  public void bpThreadEnqueue(BPServiceActorAction action) {
+    synchronized (bpThreadQueue) {
+      if (!bpThreadQueue.contains(action)) {
+        bpThreadQueue.add(action);
+      }
+    }
+  }
+
+  private void processQueueMessages() {
+    LinkedList<BPServiceActorAction> duplicateQueue;
+    synchronized (bpThreadQueue) {
+      duplicateQueue = new LinkedList<BPServiceActorAction>(bpThreadQueue);
+      bpThreadQueue.clear();
+    }
+    while (!duplicateQueue.isEmpty()) {
+      BPServiceActorAction actionItem = duplicateQueue.remove();
+      try {
+        actionItem.reportTo(bpNamenode, bpRegistration);
+      } catch (BPServiceActorActionException baae) {
+        LOG.warn(baae.getMessage() + nnAddr , baae);
+        // Adding it back to the queue if not present
+        bpThreadEnqueue(actionItem);
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38262779/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index 7171c49..e21ce38 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
@@ -37,6 +38,7 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
@@ -53,6 +55,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.PathUtils;
+import org.apache.hadoop.util.Time;
 import org.apache.log4j.Level;
 import org.junit.Before;
 import org.junit.Test;
@@ -74,6 +77,8 @@ public class TestBPOfferService {
   private static final ExtendedBlock FAKE_BLOCK =
     new ExtendedBlock(FAKE_BPID, 12345L);
   private static final File TEST_BUILD_DATA = PathUtils.getTestDir(TestBPOfferService.class);
+  private long firstCallTime = 0; 
+  private long secondCallTime = 0;
 
   static {
     ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
@@ -458,4 +463,156 @@ public class TestBPOfferService {
     return captor.getValue()[0].getBlocks();
   }
 
+  private void setTimeForSynchronousBPOSCalls() {
+    if (firstCallTime == 0) {
+      firstCallTime = Time.now();
+    } else {
+      secondCallTime = Time.now();
+    }
+  }
+  
+  private class BPOfferServiceSynchronousCallAnswer implements Answer<Void> {
+    private final int nnIdx;
+
+    public BPOfferServiceSynchronousCallAnswer(int nnIdx) {
+      this.nnIdx = nnIdx;
+    }
+
+    // For active namenode we will record the processTime and for standby
+    // namenode we will sleep for 5 seconds (This will simulate the situation
+    // where the standby namenode is down ) .
+    @Override
+    public Void answer(InvocationOnMock invocation) throws Throwable {
+      if (nnIdx == 0) {
+        setTimeForSynchronousBPOSCalls();
+      } else {
+        Thread.sleep(5000);
+      }
+      return null;
+    }
+   }
+
+  /**
+   * This test case test the {@link BPOfferService#reportBadBlocks} method
+   * such that if call to standby namenode times out then that should not 
+   * affect the active namenode heartbeat processing since this function 
+   * are in writeLock.
+   * @throws Exception
+   */
+  @Test
+  public void testReportBadBlockWhenStandbyNNTimesOut() throws Exception {
+    BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
+    bpos.start();
+    try {
+      waitForInitialization(bpos);
+      // Should start with neither NN as active.
+      assertNull(bpos.getActiveNN());
+      // Have NN1 claim active at txid 1
+      mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1);
+      bpos.triggerHeartbeatForTests();
+      // Now mockNN1 is acting like active namenode and mockNN2 as Standby
+      assertSame(mockNN1, bpos.getActiveNN());
+      Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(0))
+         .when(mockNN1).reportBadBlocks(Mockito.any(LocatedBlock[].class));
+      Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(1))
+         .when(mockNN2).reportBadBlocks(Mockito.any(LocatedBlock[].class));
+      bpos.reportBadBlocks(FAKE_BLOCK, mockFSDataset.getVolume(FAKE_BLOCK)
+          .getStorageID(), mockFSDataset.getVolume(FAKE_BLOCK)
+          .getStorageType());
+      bpos.reportBadBlocks(FAKE_BLOCK, mockFSDataset.getVolume(FAKE_BLOCK)
+          .getStorageID(), mockFSDataset.getVolume(FAKE_BLOCK)
+          .getStorageType());
+      Thread.sleep(10000);
+      long difference = secondCallTime - firstCallTime;
+      assertTrue("Active namenode reportBadBlock processing should be "
+          + "independent of standby namenode reportBadBlock processing ",
+          difference < 5000);
+    } finally {
+      bpos.stop();
+    }
+  }
+
+  /**
+   * This test case test the {@link BPOfferService#trySendErrorReport} method
+   * such that if call to standby namenode times out then that should not 
+   * affect the active namenode heartbeat processing since this function 
+   * are in writeLock.
+   * @throws Exception
+   */
+  @Test
+  public void testTrySendErrorReportWhenStandbyNNTimesOut() throws Exception {
+    BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
+    bpos.start();
+    try {
+      waitForInitialization(bpos);
+      // Should start with neither NN as active.
+      assertNull(bpos.getActiveNN());
+      // Have NN1 claim active at txid 1
+      mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1);
+      bpos.triggerHeartbeatForTests();
+      // Now mockNN1 is acting like active namenode and mockNN2 as Standby
+      assertSame(mockNN1, bpos.getActiveNN());
+      Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(0))
+          .when(mockNN1).errorReport(Mockito.any(DatanodeRegistration.class),
+          Mockito.anyInt(), Mockito.anyString());
+      Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(1))
+          .when(mockNN2).errorReport(Mockito.any(DatanodeRegistration.class),
+          Mockito.anyInt(), Mockito.anyString());
+      String errorString = "Can't send invalid block " + FAKE_BLOCK;
+      bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString);
+      bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString);
+      Thread.sleep(10000);
+      long difference = secondCallTime - firstCallTime;
+      assertTrue("Active namenode trySendErrorReport processing "
+          + "should be independent of standby namenode trySendErrorReport"
+          + " processing ", difference < 5000);
+    } finally {
+      bpos.stop();
+    }
+  }
+  /**
+   * This test case tests whether the {@BPServiceActor#processQueueMessages}
+   * adds back the error report back to the queue when 
+   * {BPServiceActorAction#reportTo} throws an IOException
+   * @throws Exception
+   */
+  @Test
+  public void testTrySendErrorReportWhenNNThrowsIOException() 
+      throws Exception {
+    BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
+    bpos.start();
+    try {
+      waitForInitialization(bpos);
+      // Should start with neither NN as active.
+      assertNull(bpos.getActiveNN());
+      // Have NN1 claim active at txid 1
+      mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1);
+      bpos.triggerHeartbeatForTests();
+      // Now mockNN1 is acting like active namenode and mockNN2 as Standby
+      assertSame(mockNN1, bpos.getActiveNN());
+      Mockito.doAnswer(new Answer<Void>() {
+        // Throw an IOException when this function is first called which will
+        // in turn add that errorReport back to the bpThreadQueue and let it 
+        // process the next time. 
+        @Override
+        public Void answer(InvocationOnMock invocation) throws Throwable {
+          if (firstCallTime == 0) {
+            firstCallTime = Time.now();
+            throw new IOException();
+          } else {
+            secondCallTime = Time.now();
+            return null;
+          }
+        }
+      }).when(mockNN1).errorReport(Mockito.any(DatanodeRegistration.class),
+          Mockito.anyInt(), Mockito.anyString());
+      String errorString = "Can't send invalid block " + FAKE_BLOCK;
+      bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString);
+      Thread.sleep(10000);
+      assertTrue("Active namenode didn't add the report back to the queue "
+          + "when errorReport threw IOException", secondCallTime != 0);
+    } finally {
+      bpos.stop();
+    }
+  } 
 }


Mime
View raw message