hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l..@apache.org
Subject hadoop git commit: HDFS-4176. EditLogTailer should call rollEdits with a timeout. (lei)
Date Mon, 08 Aug 2016 23:33:42 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 9cb6d291e -> e7c701586


HDFS-4176. EditLogTailer should call rollEdits with a timeout. (lei)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e7c70158
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e7c70158
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e7c70158

Branch: refs/heads/branch-2
Commit: e7c701586d8cfac73101b3358707c5ad22937879
Parents: 9cb6d29
Author: Lei Xu <lei@apache.org>
Authored: Mon Aug 8 16:32:01 2016 -0700
Committer: Lei Xu <lei@apache.org>
Committed: Mon Aug 8 16:32:01 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  3 +
 .../hdfs/server/namenode/ha/EditLogTailer.java  | 73 +++++++++++++++++---
 .../src/main/resources/hdfs-default.xml         |  7 ++
 .../server/namenode/ha/TestEditLogTailer.java   | 45 ++++++++++++
 4 files changed, 118 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7c70158/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index f31eb0a..602c694 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -689,6 +689,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int DFS_HA_LOGROLL_PERIOD_DEFAULT = 2 * 60; // 2m
   public static final String DFS_HA_TAILEDITS_PERIOD_KEY = "dfs.ha.tail-edits.period";
   public static final int DFS_HA_TAILEDITS_PERIOD_DEFAULT = 60; // 1m
+  public static final String DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY =
+      "dfs.ha.tail-edits.rolledits.timeout";
+  public static final int DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_DEFAULT = 60; // 1m
   public static final String DFS_HA_LOGROLL_RPC_TIMEOUT_KEY = "dfs.ha.log-roll.rpc.timeout";
   public static final int DFS_HA_LOGROLL_RPC_TIMEOUT_DEFAULT = 20000; // 20s
   public static final String DFS_HA_FENCE_METHODS_KEY = "dfs.ha.fencing.methods";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7c70158/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
index 5730de4..e13b501 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
@@ -24,6 +24,15 @@ import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
 import java.util.Collection;
 
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -96,6 +105,17 @@ public class EditLogTailer {
   private final long logRollPeriodMs;
 
   /**
+   * The timeout in milliseconds of calling rollEdits RPC to Active NN.
+   * @see HDFS-4176.
+   */
+  private final long rollEditsTimeoutMs;
+
+  /**
+   * The executor to run roll edit RPC call in a daemon thread.
+   */
+  private final ExecutorService rollEditsRpcExecutor;
+
+  /**
    * How often the Standby should check if there are new finalized segment(s)
    * available to be read from.
    */
@@ -125,7 +145,14 @@ public class EditLogTailer {
     
     sleepTimeMs = conf.getInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY,
         DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_DEFAULT) * 1000;
-    
+
+    rollEditsTimeoutMs = conf.getInt(
+        DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY,
+        DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_DEFAULT) * 1000;
+
+    rollEditsRpcExecutor = Executors.newSingleThreadExecutor(
+        new ThreadFactoryBuilder().setDaemon(true).build());
+
     LOG.debug("logRollPeriodMs=" + logRollPeriodMs +
         " sleepTime=" + sleepTimeMs);
   }
@@ -154,6 +181,7 @@ public class EditLogTailer {
   }
   
   public void stop() throws IOException {
+    rollEditsRpcExecutor.shutdown();
     tailerThread.setShouldRun(false);
     tailerThread.interrupt();
     try {
@@ -173,7 +201,7 @@ public class EditLogTailer {
   public void setEditLog(FSEditLog editLog) {
     this.editLog = editLog;
   }
-  
+
   public void catchupDuringFailover() throws IOException {
     Preconditions.checkState(tailerThread == null ||
         !tailerThread.isAlive(),
@@ -267,24 +295,49 @@ public class EditLogTailer {
   }
 
   /**
+   * @return a Callable to roll logs on remote NameNode.
+   */
+  @VisibleForTesting
+  Callable<Void> getRollEditsTask() {
+    return new Callable<Void>() {
+      @Override
+      public Void call() throws IOException {
+        getActiveNodeProxy().rollEditLog();
+        return null;
+      }
+    };
+  }
+
+  /**
    * Trigger the active node to roll its logs.
    */
-  private void triggerActiveLogRoll() {
-    LOG.info("Triggering log roll on remote NameNode " + activeAddr);
+  @VisibleForTesting
+  void triggerActiveLogRoll() {
+    LOG.info("Triggering log roll on remote NameNode");
+    Future<Void> future = null;
     try {
-      getActiveNodeProxy().rollEditLog();
+      future = rollEditsRpcExecutor.submit(getRollEditsTask());
+      future.get(rollEditsTimeoutMs, TimeUnit.MILLISECONDS);
       lastRollTriggerTxId = lastLoadedTxnId;
-    } catch (IOException ioe) {
-      if (ioe instanceof RemoteException) {
-        ioe = ((RemoteException)ioe).unwrapRemoteException();
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof RemoteException) {
+        IOException ioe = ((RemoteException) cause).unwrapRemoteException();
         if (ioe instanceof StandbyException) {
           LOG.info("Skipping log roll. Remote node is not in Active state: " +
               ioe.getMessage().split("\n")[0]);
           return;
         }
       }
-
-      LOG.warn("Unable to trigger a roll of the active NN", ioe);
+      LOG.warn("Unable to trigger a roll of the active NN", e);
+    } catch (TimeoutException e) {
+      if (future != null) {
+        future.cancel(true);
+      }
+      LOG.warn(String.format(
+          "Unable to finish rolling edits in %d ms", rollEditsTimeoutMs));
+    } catch (InterruptedException e) {
+      LOG.warn("Unable to trigger a roll of the active NN", e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7c70158/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 1692d18..a893add 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -1476,6 +1476,13 @@
 </property>
 
 <property>
+  <name>dfs.ha.tail-edits.rolledits.timeout</name>
+  <value>60</value>
+  <description>The timeout in seconds of calling rollEdits RPC on Active NN.
+  </description>
+</property>
+
+<property>
   <name>dfs.ha.automatic-failover.enabled</name>
   <value>false</value>
   <description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7c70158/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
index c400a09..1d13bbe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
@@ -17,13 +17,17 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.ha;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
 
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -48,6 +52,7 @@ import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.base.Supplier;
+import org.mockito.Mockito;
 
 @RunWith(Parameterized.class)
 public class TestEditLogTailer {
@@ -194,4 +199,44 @@ public class TestEditLogTailer {
       }
     }, 100, 10000);
   }
+
+  @Test(timeout=20000)
+  public void testRollEditTimeoutForActiveNN() throws IOException {
+    Configuration conf = getConf();
+    conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY, 5); // 5s
+    conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+
+    HAUtil.setAllowStandbyReads(conf, true);
+
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(MiniDFSNNTopology.simpleHATopology())
+        .numDataNodes(0)
+        .build();
+    cluster.waitActive();
+
+    cluster.transitionToActive(0);
+
+    try {
+      EditLogTailer tailer = Mockito.spy(
+          cluster.getNamesystem(1).getEditLogTailer());
+      final AtomicInteger flag = new AtomicInteger(0);
+
+      // Return a slow roll edit process.
+      when(tailer.getRollEditsTask()).thenReturn(
+          new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+              Thread.sleep(30000);  // sleep for 30 seconds.
+              assertTrue(Thread.currentThread().isInterrupted());
+              flag.addAndGet(1);
+              return null;
+            }
+          }
+      );
+      tailer.triggerActiveLogRoll();
+      assertEquals(0, flag.get());
+    } finally {
+      cluster.shutdown();
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message