hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xg...@apache.org
Subject hadoop git commit: YARN-4496. Improve HA ResourceManager Failover detection on the client. Contributed by Jian He
Date Sat, 23 Jan 2016 02:21:34 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk 46e5ea81e -> 618bfd6ac


YARN-4496. Improve HA ResourceManager Failover detection on the client.
Contributed by Jian He


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/618bfd6a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/618bfd6a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/618bfd6a

Branch: refs/heads/trunk
Commit: 618bfd6ac2a5b62d39e9bed80f75362bafc0ef28
Parents: 46e5ea8
Author: Xuan <xgong@apache.org>
Authored: Fri Jan 22 18:20:38 2016 -0800
Committer: Xuan <xgong@apache.org>
Committed: Fri Jan 22 18:20:38 2016 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 ...stHedgingRequestRMFailoverProxyProvider.java |  98 ++++++++++
 .../ConfiguredRMFailoverProxyProvider.java      |   6 +-
 .../org/apache/hadoop/yarn/client/RMProxy.java  |  33 ++--
 .../RequestHedgingRMFailoverProxyProvider.java  | 194 +++++++++++++++++++
 .../nodemanager/TestNodeStatusUpdater.java      |   7 +-
 .../hadoop/yarn/server/MiniYARNCluster.java     |   1 +
 7 files changed, 323 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/618bfd6a/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index f840a9e..8e87f4a 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -104,6 +104,9 @@ Release 2.9.0 - UNRELEASED
     YARN-4603. FairScheduler should mention user requested queuename in error 
     message when failed in queue ACL check. (Tao Jie via kasha)
 
+    YARN-4496. Improve HA ResourceManager Failover detection on the client.
+    (Jian He via xgong)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/618bfd6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java
new file mode 100644
index 0000000..6fd6591
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestHedgingRequestRMFailoverProxyProvider {
+
+  @Test
+  public void testHedgingRequestProxyProvider() throws Exception {
+    final MiniYARNCluster cluster =
+        new MiniYARNCluster("testHedgingRequestProxyProvider", 5, 0, 1, 1);
+    Configuration conf = new YarnConfiguration();
+
+    conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
+    conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
+    conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2,rm3,rm4,rm5");
+
+    conf.set(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER,
+        RequestHedgingRMFailoverProxyProvider.class.getName());
+    conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
+        2000);
+
+    HATestUtil.setRpcAddressForRM("rm1", 10000, conf);
+    HATestUtil.setRpcAddressForRM("rm2", 20000, conf);
+    HATestUtil.setRpcAddressForRM("rm3", 30000, conf);
+    HATestUtil.setRpcAddressForRM("rm4", 40000, conf);
+    HATestUtil.setRpcAddressForRM("rm5", 50000, conf);
+    conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
+
+    cluster.init(conf);
+    cluster.start();
+
+    final YarnClient client = YarnClient.createYarnClient();
+    client.init(conf);
+    client.start();
+
+    // Transition rm5 to active;
+    long start = System.currentTimeMillis();
+    makeRMActive(cluster, 4);
+    // client will retry until the rm becomes active.
+    client.getAllQueues();
+    long end = System.currentTimeMillis();
+    System.out.println("Client call succeeded at " + end);
+    // should return the response fast
+    Assert.assertTrue(end - start <= 10000);
+
+    // transition rm5 to standby
+    cluster.getResourceManager(4).getRMContext().getRMAdminService()
+        .transitionToStandby(new HAServiceProtocol.StateChangeRequestInfo(
+            HAServiceProtocol.RequestSource.REQUEST_BY_USER));
+
+    makeRMActive(cluster, 2);
+    client.getAllQueues();
+    cluster.stop();
+  }
+
+  private void makeRMActive(final MiniYARNCluster cluster, final int index) {
+    Thread t = new Thread() {
+      @Override public void run() {
+        try {
+          System.out.println("Transition rm" + index + " to active");
+          cluster.getResourceManager(index).getRMContext().getRMAdminService()
+              .transitionToActive(new HAServiceProtocol.StateChangeRequestInfo(
+                  HAServiceProtocol.RequestSource.REQUEST_BY_USER));
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+    };
+    t.start();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/618bfd6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java
index 5577d20..8676db2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java
@@ -45,8 +45,8 @@ public class ConfiguredRMFailoverProxyProvider<T>
   private int currentProxyIndex = 0;
   Map<String, T> proxies = new HashMap<String, T>();
 
-  private RMProxy<T> rmProxy;
-  private Class<T> protocol;
+  protected RMProxy<T> rmProxy;
+  protected Class<T> protocol;
   protected YarnConfiguration conf;
   protected String[] rmServiceIds;
 
@@ -71,7 +71,7 @@ public class ConfiguredRMFailoverProxyProvider<T>
             YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS));
   }
 
-  private T getProxyInternal() {
+  protected T getProxyInternal() {
     try {
       final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
       return RMProxy.getProxy(conf, protocol, rmAddress);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/618bfd6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
index 3779ce5..3ab06bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.ipc.RetriableException;
+import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.net.ConnectTimeoutException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -77,6 +78,7 @@ public class RMProxy<T> {
   }
 
   /**
+   * Currently, used by Client and AM only
    * Create a proxy for the specified protocol. For non-HA,
    * this is a direct connection to the ResourceManager address. When HA is
    * enabled, the proxy handles the failover between the ResourceManagers as
@@ -88,12 +90,12 @@ public class RMProxy<T> {
     YarnConfiguration conf = (configuration instanceof YarnConfiguration)
         ? (YarnConfiguration) configuration
         : new YarnConfiguration(configuration);
-    RetryPolicy retryPolicy =
-        createRetryPolicy(conf);
-    return createRMProxy(conf, protocol, instance, retryPolicy);
+    RetryPolicy retryPolicy = createRetryPolicy(conf, HAUtil.isHAEnabled(conf));
+    return newProxyInstance(conf, protocol, instance, retryPolicy);
   }
 
   /**
+   * Currently, used by NodeManagers only.
    * Create a proxy for the specified protocol. For non-HA,
    * this is a direct connection to the ResourceManager address. When HA is
    * enabled, the proxy handles the failover between the ResourceManagers as
@@ -106,12 +108,12 @@ public class RMProxy<T> {
     YarnConfiguration conf = (configuration instanceof YarnConfiguration)
         ? (YarnConfiguration) configuration
         : new YarnConfiguration(configuration);
-    RetryPolicy retryPolicy =
-        createRetryPolicy(conf, retryTime, retryInterval);
-    return createRMProxy(conf, protocol, instance, retryPolicy);
+    RetryPolicy retryPolicy = createRetryPolicy(conf, retryTime, retryInterval,
+        HAUtil.isHAEnabled(conf));
+    return newProxyInstance(conf, protocol, instance, retryPolicy);
   }
 
-  private static <T> T createRMProxy(final YarnConfiguration conf,
+  private static <T> T newProxyInstance(final YarnConfiguration conf,
       final Class<T> protocol, RMProxy instance, RetryPolicy retryPolicy)
           throws IOException{
     if (HAUtil.isHAEnabled(conf)) {
@@ -144,7 +146,7 @@ public class RMProxy<T> {
   @Deprecated
   public static <T> T createRMProxy(final Configuration conf,
       final Class<T> protocol, InetSocketAddress rmAddress) throws IOException {
-    RetryPolicy retryPolicy = createRetryPolicy(conf);
+    RetryPolicy retryPolicy = createRetryPolicy(conf, HAUtil.isHAEnabled(conf));
     T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
     LOG.info("Connecting to ResourceManager at " + rmAddress);
     return (T) RetryProxy.create(protocol, proxy, retryPolicy);
@@ -194,7 +196,8 @@ public class RMProxy<T> {
    */
   @Private
   @VisibleForTesting
-  public static RetryPolicy createRetryPolicy(Configuration conf) {
+  public static RetryPolicy createRetryPolicy(Configuration conf,
+      boolean isHAEnabled) {
     long rmConnectWaitMS =
         conf.getLong(
             YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
@@ -204,16 +207,17 @@ public class RMProxy<T> {
             YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
             YarnConfiguration
                 .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS);
-    return createRetryPolicy(
-        conf, rmConnectWaitMS, rmConnectionRetryIntervalMS);
+
+    return createRetryPolicy(conf, rmConnectWaitMS, rmConnectionRetryIntervalMS,
+        isHAEnabled);
   }
 
   /**
    * Fetch retry policy from Configuration and create the
    * retry policy with specified retryTime and retry interval.
    */
-  private static RetryPolicy createRetryPolicy(Configuration conf,
-      long retryTime, long retryInterval) {
+  protected static RetryPolicy createRetryPolicy(Configuration conf,
+      long retryTime, long retryInterval, boolean isHAEnabled) {
     long rmConnectWaitMS = retryTime;
     long rmConnectionRetryIntervalMS = retryInterval;
 
@@ -236,7 +240,7 @@ public class RMProxy<T> {
     }
 
     // Handle HA case first
-    if (HAUtil.isHAEnabled(conf)) {
+    if (isHAEnabled) {
       final long failoverSleepBaseMs = conf.getLong(
           YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS,
           rmConnectionRetryIntervalMS);
@@ -287,6 +291,7 @@ public class RMProxy<T> {
     exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy);
     exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
     exceptionToPolicyMap.put(SocketException.class, retryPolicy);
+    exceptionToPolicyMap.put(StandbyException.class, retryPolicy);
     // YARN-4288: local IOException is also possible.
     exceptionToPolicyMap.put(IOException.class, retryPolicy);
     // Not retry on remote IO exception.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/618bfd6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java
new file mode 100644
index 0000000..dc8d19b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.retry.MultiException;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * A FailoverProxyProvider implementation that technically does not "failover"
+ * per-se. It constructs a wrapper proxy that sends the request to ALL
+ * underlying proxies simultaneously. Each proxy inside the wrapper proxy will
+ * retry the corresponding target. It assumes the in an HA setup, there will
+ * be only one Active, and the active should respond faster than any configured
+ * standbys. Once it receives a response from any one of the configred proxies,
+ * outstanding requests to other proxies are immediately cancelled.
+ */
+public class RequestHedgingRMFailoverProxyProvider<T>
+    extends ConfiguredRMFailoverProxyProvider<T> {
+
+  private static final Log LOG =
+      LogFactory.getLog(RequestHedgingRMFailoverProxyProvider.class);
+
+  private volatile String successfulProxy = null;
+  private ProxyInfo<T> wrappedProxy = null;
+  private Map<String, T> nonRetriableProxy = new HashMap<>();
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void init(Configuration configuration, RMProxy<T> rmProxy,
+      Class<T> protocol) {
+    super.init(configuration, rmProxy, protocol);
+    Map<String, ProxyInfo<T>> retriableProxies = new HashMap<>();
+
+    String originalId = HAUtil.getRMHAId(conf);
+    for (String rmId : rmServiceIds) {
+      conf.set(YarnConfiguration.RM_HA_ID, rmId);
+      nonRetriableProxy.put(rmId, super.getProxyInternal());
+
+      T proxy = createRetriableProxy();
+      ProxyInfo<T> pInfo = new ProxyInfo<T>(proxy, rmId);
+      retriableProxies.put(rmId, pInfo);
+    }
+    conf.set(YarnConfiguration.RM_HA_ID, originalId);
+
+    T proxyInstance = (T) Proxy.newProxyInstance(
+        RMRequestHedgingInvocationHandler.class.getClassLoader(),
+        new Class<?>[] {protocol},
+        new RMRequestHedgingInvocationHandler(retriableProxies));
+    String combinedInfo = Arrays.toString(rmServiceIds);
+    wrappedProxy = new ProxyInfo<T>(proxyInstance, combinedInfo);
+    LOG.info("Created wrapped proxy for " + combinedInfo);
+  }
+
+  @SuppressWarnings("unchecked")
+  protected T createRetriableProxy() {
+    try {
+      // Create proxy that can retry exceptions properly.
+      RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf, false);
+      InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
+      T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
+      return (T) RetryProxy.create(protocol, proxy, retryPolicy);
+    } catch (IOException ioe) {
+      LOG.error("Unable to create proxy to the ResourceManager " + HAUtil
+          .getRMHAId(conf), ioe);
+      return null;
+    }
+  }
+
+  class RMRequestHedgingInvocationHandler implements InvocationHandler {
+
+    final private Map<String, ProxyInfo<T>> allProxies;
+
+    public RMRequestHedgingInvocationHandler(
+        Map<String, ProxyInfo<T>> allProxies) {
+      this.allProxies = new HashMap<>(allProxies);
+    }
+
+    protected Object invokeMethod(Object proxy, Method method, Object[] args)
+        throws Throwable {
+      try {
+        return method.invoke(proxy, args);
+      } catch (InvocationTargetException ex) {
+        throw ex.getCause();
+      }
+    }
+
+    /**
+     * Creates a Executor and invokes all proxies concurrently.
+     */
+    @Override
+    public Object invoke(Object proxy, final Method method,
+        final Object[] args) throws Throwable {
+      if (successfulProxy != null) {
+        return invokeMethod(nonRetriableProxy.get(successfulProxy), method, args);
+      }
+
+      ExecutorService executor = null;
+      CompletionService<Object> completionService;
+      try {
+        Map<Future<Object>, ProxyInfo<T>> proxyMap = new HashMap<>();
+        int numAttempts = 0;
+        executor = Executors.newFixedThreadPool(allProxies.size());
+        completionService = new ExecutorCompletionService<>(executor);
+        for (final ProxyInfo<T> pInfo : allProxies.values()) {
+          Callable<Object> c = new Callable<Object>() {
+            @Override public Object call() throws Exception {
+              return method.invoke(pInfo.proxy, args);
+            }
+          };
+          proxyMap.put(completionService.submit(c), pInfo);
+          numAttempts++;
+        }
+
+        Map<String, Exception> badResults = new HashMap<>();
+        while (numAttempts > 0) {
+          Future<Object> callResultFuture = completionService.take();
+          String pInfo = proxyMap.get(callResultFuture).proxyInfo;
+          Object retVal;
+          try {
+            retVal = callResultFuture.get();
+            successfulProxy = pInfo;
+            LOG.info("Invocation successful on [" + pInfo + "]");
+            return retVal;
+          } catch (Exception ex) {
+            LOG.warn("Invocation returned exception on " + "[" + pInfo + "]");
+            badResults.put(pInfo, ex);
+            numAttempts--;
+          }
+        }
+
+        // At this point we should have All bad results (Exceptions)
+        // Or should have returned with successful result.
+        if (badResults.size() == 1) {
+          throw badResults.values().iterator().next();
+        } else {
+          throw new MultiException(badResults);
+        }
+      } finally {
+        if (executor != null) {
+          executor.shutdownNow();
+        }
+      }
+    }
+  }
+
+  @Override
+  public ProxyInfo<T> getProxy() {
+    return wrappedProxy;
+  }
+
+  @Override
+  public void performFailover(T currentProxy) {
+    LOG.info("Connection lost, trying to fail over.");
+    successfulProxy = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/618bfd6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index 90804b8..a8066c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.RMProxy;
+import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -443,7 +444,8 @@ public class TestNodeStatusUpdater {
 
     @Override
     protected ResourceTracker getRMClient() throws IOException {
-      RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf);
+      RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf,
+          HAUtil.isHAEnabled(conf));
       resourceTracker =
           (ResourceTracker) RetryProxy.create(ResourceTracker.class,
             new MyResourceTracker6(rmStartIntervalMS, rmNeverStart),
@@ -476,7 +478,8 @@ public class TestNodeStatusUpdater {
 
     @Override
     protected ResourceTracker getRMClient() {
-      RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf);
+      RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf,
+          HAUtil.isHAEnabled(conf));
       return (ResourceTracker) RetryProxy.create(ResourceTracker.class,
         resourceTracker, retryPolicy);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/618bfd6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
index 68c9efd..630b7ef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
@@ -457,6 +457,7 @@ public class MiniYARNCluster extends CompositeService {
     protected synchronized void serviceStart() throws Exception {
       startResourceManager(index);
       Configuration conf = resourceManagers[index].getConfig();
+      LOG.info("Starting resourcemanager " + index);
       LOG.info("MiniYARN ResourceManager address: " +
           conf.get(YarnConfiguration.RM_ADDRESS));
       LOG.info("MiniYARN ResourceManager web address: " + WebAppUtils


Mime
View raw message