hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1551740 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/ hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/clien...
Date Tue, 17 Dec 2013 22:33:17 GMT
Author: vinodkv
Date: Tue Dec 17 22:33:16 2013
New Revision: 1551740

URL: http://svn.apache.org/r1551740
Log:
YARN-1028. Added FailoverProxyProvider capability to ResourceManager to help with RM failover.
Contributed by Karthik Kambatla.
svn merge --ignore-ancestry -c 1551739 ../../trunk/

Added:
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
      - copied unchanged from r1551739, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java
      - copied unchanged from r1551739, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMFailoverProxyProvider.java
      - copied unchanged from r1551739, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMFailoverProxyProvider.java
Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1551740&r1=1551739&r2=1551740&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Tue Dec 17 22:33:16 2013
@@ -34,6 +34,9 @@ Release 2.4.0 - UNRELEASED
     YARN-312. Introduced ResourceManagerAdministrationProtocol changes to support
     changing resources on node. (Junping Du via vinodkv)
 
+    YARN-1028. Added FailoverProxyProvider capability to ResourceManager to help
+    with RM failover. (Karthik Kambatla via vinodkv)
+
   IMPROVEMENTS
 
     YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml?rev=1551740&r1=1551739&r2=1551740&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
Tue Dec 17 22:33:16 2013
@@ -310,4 +310,12 @@
     <Bug pattern="IS2_INCONSISTENT_SYNC" />
   </Match>
 
+  <!-- Ignore INSTANCE not being final as it is created in sub-classes -->
+  <Match>
+    <Class name="org.apache.hadoop.yarn.client.RMProxy" />
+    <Field name="INSTANCE" />
+    <Bug pattern="MS_SHOULD_BE_FINAL"/>
+  </Match>
+
+
 </FindBugsFilter>

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1551740&r1=1551739&r2=1551740&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
Tue Dec 17 22:33:16 2013
@@ -296,6 +296,31 @@ public class YarnConfiguration extends C
           HttpConfig.isSecure() ? RM_WEBAPP_HTTPS_ADDRESS
               : RM_WEBAPP_ADDRESS));
 
+  public static final String CLIENT_FAILOVER_PREFIX =
+      YARN_PREFIX + "client.failover-";
+  public static final String CLIENT_FAILOVER_PROXY_PROVIDER =
+      CLIENT_FAILOVER_PREFIX + "proxy-provider";
+  public static final String DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER =
+      "org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider";
+
+  public static final String CLIENT_FAILOVER_MAX_ATTEMPTS =
+      CLIENT_FAILOVER_PREFIX + "max-attempts";
+
+  public static final String CLIENT_FAILOVER_SLEEPTIME_BASE_MS =
+      CLIENT_FAILOVER_PREFIX + "sleep-base-ms";
+
+  public static final String CLIENT_FAILOVER_SLEEPTIME_MAX_MS =
+      CLIENT_FAILOVER_PREFIX + "sleep-max-ms";
+
+  public static final String CLIENT_FAILOVER_RETRIES =
+      CLIENT_FAILOVER_PREFIX + "retries";
+  public static final int DEFAULT_CLIENT_FAILOVER_RETRIES = 0;
+
+  public static final String CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS =
+      CLIENT_FAILOVER_PREFIX + "retries-on-socket-timeouts";
+  public static final int
+      DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS = 0;
+
   ////////////////////////////////
   // RM state store configs
   ////////////////////////////////
@@ -850,22 +875,31 @@ public class YarnConfiguration extends C
   public static final String IS_MINI_YARN_CLUSTER = YARN_PREFIX
       + "is.minicluster";
 
+  public static final String YARN_MC_PREFIX = YARN_PREFIX + "minicluster.";
+
   /** Whether to use fixed ports with the minicluster. */
-  public static final String YARN_MINICLUSTER_FIXED_PORTS = YARN_PREFIX
-      + "minicluster.fixed.ports";
+  public static final String YARN_MINICLUSTER_FIXED_PORTS =
+      YARN_MC_PREFIX + "fixed.ports";
 
   /**
    * Default is false to be able to run tests concurrently without port
    * conflicts.
    */
-  public static boolean DEFAULT_YARN_MINICLUSTER_FIXED_PORTS = false;
+  public static final boolean DEFAULT_YARN_MINICLUSTER_FIXED_PORTS = false;
+
+  /**
+   * Whether the NM should use RPC to connect to the RM. Default is false.
+   * Can be set to true only when using fixed ports.
+   */
+  public static final String YARN_MINICLUSTER_USE_RPC = YARN_MC_PREFIX + "use-rpc";
+  public static final boolean DEFAULT_YARN_MINICLUSTER_USE_RPC = false;
 
   /**
    * Whether users are explicitly trying to control resource monitoring
    * configuration for the MiniYARNCluster. Disabled by default.
    */
   public static final String YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING =
-      YARN_PREFIX + "minicluster.control-resource-monitoring";
+      YARN_MC_PREFIX + "control-resource-monitoring";
   public static final boolean
       DEFAULT_YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING = false;
 

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java?rev=1551740&r1=1551739&r2=1551740&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
Tue Dec 17 22:33:16 2013
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -34,17 +35,37 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
 
-public class ClientRMProxy<T> extends RMProxy<T>  {
+import com.google.common.base.Preconditions;
 
+public class ClientRMProxy<T> extends RMProxy<T>  {
   private static final Log LOG = LogFactory.getLog(ClientRMProxy.class);
 
+  private interface ClientRMProtocols extends ApplicationClientProtocol,
+      ApplicationMasterProtocol, ResourceManagerAdministrationProtocol {
+    // Add nothing
+  }
+
+  static {
+    INSTANCE = new ClientRMProxy();
+  }
+
+  private ClientRMProxy(){
+    super();
+  }
+
+  /**
+   * Create a proxy to the ResourceManager for the specified protocol.
+   * @param configuration Configuration with all the required information.
+   * @param protocol Client protocol for which proxy is being requested.
+   * @param <T> Type of proxy.
+   * @return Proxy to the ResourceManager for the specified client protocol.
+   * @throws IOException
+   */
   public static <T> T createRMProxy(final Configuration configuration,
       final Class<T> protocol) throws IOException {
-    YarnConfiguration conf = (configuration instanceof YarnConfiguration)
-        ? (YarnConfiguration) configuration
-        : new YarnConfiguration(configuration);
-    InetSocketAddress rmAddress = getRMAddress(conf, protocol);
-    return createRMProxy(conf, protocol, rmAddress);
+    // This method exists only to initiate this class' static INSTANCE. TODO:
+    // FIX if possible
+    return RMProxy.createRMProxy(configuration, protocol);
   }
 
   private static void setupTokens(InetSocketAddress resourceManagerAddress)
@@ -63,7 +84,9 @@ public class ClientRMProxy<T> extends RM
     }
   }
 
-  private static InetSocketAddress getRMAddress(YarnConfiguration conf,
+  @InterfaceAudience.Private
+  @Override
+  protected InetSocketAddress getRMAddress(YarnConfiguration conf,
       Class<?> protocol) throws IOException {
     if (protocol == ApplicationClientProtocol.class) {
       return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
@@ -89,4 +112,12 @@ public class ClientRMProxy<T> extends RM
       throw new IllegalStateException(message);
     }
   }
+
+  @InterfaceAudience.Private
+  @Override
+  protected void checkAllowedProtocols(Class<?> protocol) {
+    Preconditions.checkArgument(
+        protocol.isAssignableFrom(ClientRMProtocols.class),
+        "RM does not support this client protocol");
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java?rev=1551740&r1=1551739&r2=1551740&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
Tue Dec 17 22:33:16 2013
@@ -19,7 +19,6 @@
 package org.apache.hadoop.yarn.client.api.impl;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
@@ -79,7 +78,6 @@ public class YarnClientImpl extends Yarn
   private static final Log LOG = LogFactory.getLog(YarnClientImpl.class);
 
   protected ApplicationClientProtocol rmClient;
-  protected InetSocketAddress rmAddress;
   protected long submitPollIntervalMillis;
   private long asyncApiPollIntervalMillis;
 
@@ -89,15 +87,9 @@ public class YarnClientImpl extends Yarn
     super(YarnClientImpl.class.getName());
   }
 
-  private static InetSocketAddress getRmAddress(Configuration conf) {
-    return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
-      YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
-  }
-
   @SuppressWarnings("deprecation")
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-    this.rmAddress = getRmAddress(conf);
     asyncApiPollIntervalMillis =
         conf.getLong(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS,
           YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
@@ -180,9 +172,7 @@ public class YarnClientImpl extends Yarn
       }
     }
 
-
-    LOG.info("Submitted application " + applicationId + " to ResourceManager"
-        + " at " + rmAddress);
+    LOG.info("Submitted application " + applicationId);
     return applicationId;
   }
 

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java?rev=1551740&r1=1551739&r2=1551740&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
Tue Dec 17 22:33:16 2013
@@ -36,6 +36,8 @@ import org.apache.hadoop.io.retry.RetryP
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -48,7 +50,68 @@ import com.google.common.annotations.Vis
 public class RMProxy<T> {
 
   private static final Log LOG = LogFactory.getLog(RMProxy.class);
+  protected static RMProxy INSTANCE;
 
+  protected RMProxy() {}
+
+  /**
+   * Verify the passed protocol is supported.
+   */
+  @Private
+  protected void checkAllowedProtocols(Class<?> protocol) {}
+
+  /**
+   * Get the ResourceManager address from the provided Configuration for the
+   * given protocol.
+   */
+  @Private
+  protected InetSocketAddress getRMAddress(
+      YarnConfiguration conf, Class<?> protocol) throws IOException {
+    throw new UnsupportedOperationException("This method should be invoked " +
+        "from an instance of ClientRMProxy or ServerRMProxy");
+  }
+
+  /**
+   * Create a proxy for the specified protocol. For non-HA,
+   * this is a direct connection to the ResourceManager address. When HA is
+   * enabled, the proxy handles the failover between the ResourceManagers as
+   * well.
+   */
+  @Private
+  protected static <T> T createRMProxy(final Configuration configuration,
+      final Class<T> protocol) throws IOException {
+    YarnConfiguration conf = (configuration instanceof YarnConfiguration)
+        ? (YarnConfiguration) configuration
+        : new YarnConfiguration(configuration);
+    RetryPolicy retryPolicy = createRetryPolicy(conf);
+    if (HAUtil.isHAEnabled(conf)) {
+      RMFailoverProxyProvider<T> provider =
+          INSTANCE.createRMFailoverProxyProvider(conf, protocol);
+      return (T) RetryProxy.create(protocol, provider, retryPolicy);
+    } else {
+      InetSocketAddress rmAddress = INSTANCE.getRMAddress(conf, protocol);
+      LOG.info("Connecting to ResourceManager at " + rmAddress);
+      T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
+      return (T) RetryProxy.create(protocol, proxy, retryPolicy);
+    }
+  }
+
+  /**
+   * @deprecated
+   * This method is deprecated and is not used by YARN internally any more.
+   * To create a proxy to the RM, use ClientRMProxy#createRMProxy or
+   * ServerRMProxy#createRMProxy.
+   *
+   * Create a proxy to the ResourceManager at the specified address.
+   *
+   * @param conf Configuration to generate retry policy
+   * @param protocol Protocol for the proxy
+   * @param rmAddress Address of the ResourceManager
+   * @param <T> Type information of the proxy
+   * @return Proxy to the RM
+   * @throws IOException
+   */
+  @Deprecated
   public static <T> T createRMProxy(final Configuration conf,
       final Class<T> protocol, InetSocketAddress rmAddress) throws IOException {
     RetryPolicy retryPolicy = createRetryPolicy(conf);
@@ -57,12 +120,16 @@ public class RMProxy<T> {
     return (T) RetryProxy.create(protocol, proxy, retryPolicy);
   }
 
-  private static <T> T getProxy(final Configuration conf,
+  /**
+   * Get a proxy to the RM at the specified address. To be used to create a
+   * RetryProxy.
+   */
+  @Private
+  static <T> T getProxy(final Configuration conf,
       final Class<T> protocol, final InetSocketAddress rmAddress)
       throws IOException {
     return UserGroupInformation.getCurrentUser().doAs(
       new PrivilegedAction<T>() {
-
         @Override
         public T run() {
           return (T) YarnRPC.create(conf).getProxy(protocol, rmAddress, conf);
@@ -70,6 +137,50 @@ public class RMProxy<T> {
       });
   }
 
+  /**
+   * Helper method to create FailoverProxyProvider.
+   */
+  private <T> RMFailoverProxyProvider<T> createRMFailoverProxyProvider(
+      Configuration conf, Class<T> protocol) {
+    Class<? extends RMFailoverProxyProvider<T>> defaultProviderClass;
+    try {
+      defaultProviderClass = (Class<? extends RMFailoverProxyProvider<T>>)
+          Class.forName(
+              YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER);
+    } catch (Exception e) {
+      throw new YarnRuntimeException("Invalid default failover provider class" +
+          YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER, e);
+    }
+
+    RMFailoverProxyProvider<T> provider = ReflectionUtils.newInstance(
+        conf.getClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER,
+            defaultProviderClass, RMFailoverProxyProvider.class), conf);
+    provider.init(conf, (RMProxy<T>) this, protocol);
+    return provider;
+  }
+
+  /**
+   * A RetryPolicy to allow failing over upto the specified maximum time.
+   */
+  private static class FailoverUptoMaximumTimePolicy implements RetryPolicy {
+    private long maxTime;
+
+    FailoverUptoMaximumTimePolicy(long maxTime) {
+      this.maxTime = maxTime;
+    }
+
+    @Override
+    public RetryAction shouldRetry(Exception e, int retries, int failovers,
+        boolean isIdempotentOrAtMostOnce) throws Exception {
+      return System.currentTimeMillis() < maxTime
+          ? RetryAction.FAILOVER_AND_RETRY
+          : RetryAction.FAIL;
+    }
+  }
+
+  /**
+   * Fetch retry policy from Configuration
+   */
   @Private
   @VisibleForTesting
   public static RetryPolicy createRetryPolicy(Configuration conf) {
@@ -81,19 +192,10 @@ public class RMProxy<T> {
         conf.getLong(
             YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
             YarnConfiguration
-            .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS);
-
-    if (rmConnectionRetryIntervalMS < 0) {
-      throw new YarnRuntimeException("Invalid Configuration. " +
-          YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS +
-          " should not be negative.");
-    }
+                .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS);
 
     boolean waitForEver = (rmConnectWaitMS == -1);
-
-    if (waitForEver) {
-      return  RetryPolicies.RETRY_FOREVER;
-    } else {
+    if (!waitForEver) {
       if (rmConnectWaitMS < 0) {
         throw new YarnRuntimeException("Invalid Configuration. "
             + YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS
@@ -110,18 +212,54 @@ public class RMProxy<T> {
       }
     }
 
+    // Handle HA case first
+    if (HAUtil.isHAEnabled(conf)) {
+      final long failoverSleepBaseMs = conf.getLong(
+          YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS,
+          rmConnectionRetryIntervalMS);
+
+      final long failoverSleepMaxMs = conf.getLong(
+          YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_MAX_MS,
+          rmConnectionRetryIntervalMS);
+
+      int maxFailoverAttempts = conf.getInt(
+          YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, -1);
+
+      RetryPolicy basePolicy = RetryPolicies.TRY_ONCE_THEN_FAIL;
+      if (maxFailoverAttempts == -1) {
+        if (waitForEver) {
+          basePolicy = RetryPolicies.FAILOVER_FOREVER;
+        } else {
+          basePolicy = new FailoverUptoMaximumTimePolicy(
+              System.currentTimeMillis() + rmConnectWaitMS);
+        }
+        maxFailoverAttempts = 0;
+      }
+
+      return RetryPolicies.failoverOnNetworkException(basePolicy,
+          maxFailoverAttempts, failoverSleepBaseMs, failoverSleepMaxMs);
+    }
+
+    if (waitForEver) {
+      return RetryPolicies.RETRY_FOREVER;
+    }
+
+    if (rmConnectionRetryIntervalMS < 0) {
+      throw new YarnRuntimeException("Invalid Configuration. " +
+          YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS +
+          " should not be negative.");
+    }
+
     RetryPolicy retryPolicy =
         RetryPolicies.retryUpToMaximumTimeWithFixedSleep(rmConnectWaitMS,
-            rmConnectionRetryIntervalMS,
-            TimeUnit.MILLISECONDS);
+            rmConnectionRetryIntervalMS, TimeUnit.MILLISECONDS);
 
     Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
         new HashMap<Class<? extends Exception>, RetryPolicy>();
     exceptionToPolicyMap.put(ConnectException.class, retryPolicy);
     //TO DO: after HADOOP-9576,  IOException can be changed to EOFException
     exceptionToPolicyMap.put(IOException.class, retryPolicy);
-
-    return RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL,
-      exceptionToPolicyMap);
+    return RetryPolicies.retryByException(
+        RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
   }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1551740&r1=1551739&r2=1551740&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
Tue Dec 17 22:33:16 2013
@@ -425,6 +425,61 @@
   </property>
 
   <property>
+    <description>When HA is enabled, the class to be used by Clients, AMs and
+      NMs to failover to the Active RM. It should extend
+      org.apache.hadoop.yarn.client.RMFailoverProxyProvider</description>
+    <name>yarn.client.failover-proxy-provider</name>
+    <value>org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider</value>
+  </property>
+
+  <property>
+    <description>When HA is enabled, the max number of times
+      FailoverProxyProvider should attempt failover. When set,
+      this overrides the yarn.resourcemanager.connect.max-wait.ms. When
+      not set, this is inferred from
+      yarn.resourcemanager.connect.max-wait.ms.</description>
+    <name>yarn.client.failover-max-attempts</name>
+    <!--value>15</value-->
+  </property>
+
+  <property>
+    <description>When HA is enabled, the sleep base (in milliseconds) to be
+      used for calculating the exponential delay between failovers. When set,
+      this overrides the yarn.resourcemanager.connect.* settings. When
+      not set, yarn.resourcemanager.connect.retry-interval.ms is used instead.
+    </description>
+    <name>yarn.client.failover-sleep-base-ms</name>
+    <!--value>500</value-->
+  </property>
+
+  <property>
+    <description>When HA is enabled, the maximum sleep time (in milliseconds)
+      between failovers. When set, this overrides the
+      yarn.resourcemanager.connect.* settings. When not set,
+      yarn.resourcemanager.connect.retry-interval.ms is used instead.</description>
+    <name>yarn.client.failover-sleep-max-ms</name>
+    <!--value>15000</value-->
+  </property>
+
+  <property>
+    <description>When HA is enabled, the number of retries per
+      attempt to connect to a ResourceManager. In other words,
+      it is the ipc.client.connect.max.retries to be used during
+      failover attempts</description>
+    <name>yarn.client.failover-retries</name>
+    <value>0</value>
+  </property>
+
+  <property>
+    <description>When HA is enabled, the number of retries per
+      attempt to connect to a ResourceManager on socket timeouts. In other
+      words, it is the ipc.client.connect.max.retries.on.timeouts to be used
+      during failover attempts</description>
+    <name>yarn.client.failover-retries-on-socket-timeouts</name>
+    <value>0</value>
+  </property>
+
+  <property>
     <description>The maximum number of completed applications RM keeps. </description>
     <name>yarn.resourcemanager.max-completed-applications</name>
     <value>10000</value>

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java?rev=1551740&r1=1551739&r2=1551740&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
Tue Dec 17 22:33:16 2013
@@ -23,25 +23,43 @@ import java.net.InetSocketAddress;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.client.RMProxy;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
-public class ServerRMProxy<T> extends RMProxy<T> {
+import com.google.common.base.Preconditions;
 
+public class ServerRMProxy<T> extends RMProxy<T> {
   private static final Log LOG = LogFactory.getLog(ServerRMProxy.class);
 
+  static {
+    INSTANCE = new ServerRMProxy();
+  }
+
+  private ServerRMProxy() {
+    super();
+  }
+
+  /**
+   * Create a proxy to the ResourceManager for the specified protocol.
+   * @param configuration Configuration with all the required information.
+   * @param protocol Server protocol for which proxy is being requested.
+   * @param <T> Type of proxy.
+   * @return Proxy to the ResourceManager for the specified server protocol.
+   * @throws IOException
+   */
   public static <T> T createRMProxy(final Configuration configuration,
       final Class<T> protocol) throws IOException {
-    YarnConfiguration conf = (configuration instanceof YarnConfiguration)
-        ? (YarnConfiguration) configuration
-        : new YarnConfiguration(configuration);
-    InetSocketAddress rmAddress = getRMAddress(conf, protocol);
-    return createRMProxy(conf, protocol, rmAddress);
+    // This method exists only to initiate this class' static INSTANCE. TODO:
+    // FIX if possible
+    return RMProxy.createRMProxy(configuration, protocol);
   }
 
-  private static InetSocketAddress getRMAddress(YarnConfiguration conf,
-                                                Class<?> protocol) {
+  @InterfaceAudience.Private
+  @Override
+  protected InetSocketAddress getRMAddress(YarnConfiguration conf,
+                                           Class<?> protocol) {
     if (protocol == ResourceTracker.class) {
       return conf.getSocketAddr(
         YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
@@ -55,4 +73,12 @@ public class ServerRMProxy<T> extends RM
       throw new IllegalStateException(message);
     }
   }
+
+  @InterfaceAudience.Private
+  @Override
+  protected void checkAllowedProtocols(Class<?> protocol) {
+    Preconditions.checkArgument(
+        protocol.isAssignableFrom(ResourceTracker.class),
+        "ResourceManager does not support this protocol");
+  }
 }
\ No newline at end of file

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java?rev=1551740&r1=1551739&r2=1551740&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
Tue Dec 17 22:33:16 2013
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -38,6 +39,7 @@ import org.apache.hadoop.service.Abstrac
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -65,6 +67,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
+import static org.junit.Assert.fail;
+
 /**
  * Embedded Yarn minicluster for testcases that need to interact with a cluster.
  * <p/>
@@ -91,9 +95,11 @@ public class MiniYARNCluster extends Com
 
   private NodeManager[] nodeManagers;
   private ResourceManager[] resourceManagers;
+  private String[] rmIds;
+
+  private boolean useFixedPorts;
+  private boolean useRpc = false;
 
-  private ResourceManagerWrapper resourceManagerWrapper;
-  
   private ConcurrentMap<ApplicationAttemptId, Long> appMasters =
       new ConcurrentHashMap<ApplicationAttemptId, Long>(16, 0.75f, 2);
   
@@ -163,15 +169,7 @@ public class MiniYARNCluster extends Com
     }
 
     resourceManagers = new ResourceManager[numResourceManagers];
-    for (int i = 0; i < numResourceManagers; i++) {
-      resourceManagers[i] = new ResourceManager();
-      addService(new ResourceManagerWrapper(i));
-    }
-    nodeManagers = new CustomNodeManager[numNodeManagers];
-    for(int index = 0; index < numNodeManagers; index++) {
-      addService(new NodeManagerWrapper(index));
-      nodeManagers[index] = new CustomNodeManager();
-    }
+    nodeManagers = new NodeManager[numNodeManagers];
   }
 
   /**
@@ -185,20 +183,45 @@ public class MiniYARNCluster extends Com
     this(testName, 1, numNodeManagers, numLocalDirs, numLogDirs);
   }
 
-    @Override
+  @Override
   public void serviceInit(Configuration conf) throws Exception {
+    useFixedPorts = conf.getBoolean(
+        YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS,
+        YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS);
+    useRpc = conf.getBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC,
+        YarnConfiguration.DEFAULT_YARN_MINICLUSTER_USE_RPC);
+
+    if (useRpc && !useFixedPorts) {
+      throw new YarnRuntimeException("Invalid configuration!" +
+          " Minicluster can use rpc only when configured to use fixed ports");
+    }
+
     if (resourceManagers.length > 1) {
       conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
-
-      StringBuilder rmIds = new StringBuilder();
-      for (int i = 0; i < resourceManagers.length; i++) {
-        if (i != 0) {
-          rmIds.append(",");
+      if (conf.get(YarnConfiguration.RM_HA_IDS) == null) {
+        StringBuilder rmIds = new StringBuilder();
+        for (int i = 0; i < resourceManagers.length; i++) {
+          if (i != 0) {
+            rmIds.append(",");
+          }
+          rmIds.append("rm" + i);
         }
-        rmIds.append("rm" + i);
+        conf.set(YarnConfiguration.RM_HA_IDS, rmIds.toString());
       }
-      conf.set(YarnConfiguration.RM_HA_IDS, rmIds.toString());
+      Collection<String> rmIdsCollection = HAUtil.getRMHAIds(conf);
+      rmIds = rmIdsCollection.toArray(new String[rmIdsCollection.size()]);
     }
+
+    for (int i = 0; i < resourceManagers.length; i++) {
+      resourceManagers[i] = new ResourceManager();
+      addService(new ResourceManagerWrapper(i));
+    }
+    for(int index = 0; index < nodeManagers.length; index++) {
+      nodeManagers[index] =
+          useRpc ? new CustomNodeManager() : new ShortCircuitedNodeManager();
+      addService(new NodeManagerWrapper(index));
+    }
+
     super.serviceInit(
         conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
   }
@@ -213,11 +236,12 @@ public class MiniYARNCluster extends Com
    *
    * In an non-HA cluster, return the index of the only RM.
    *
-   * @return index of the active RM
+   * @return index of the active RM or -1 if none of them transition to
+   * active even after 5 seconds of waiting
    */
   @InterfaceAudience.Private
   @VisibleForTesting
-  int getActiveRMIndex() {
+  public int getActiveRMIndex() {
     if (resourceManagers.length == 1) {
       return 0;
     }
@@ -292,9 +316,7 @@ public class MiniYARNCluster extends Com
     }
 
     private void setHARMConfiguration(Configuration conf) {
-      String rmId = "rm" + index;
       String hostname = MiniYARNCluster.getHostname();
-      conf.set(YarnConfiguration.RM_HA_ID, rmId);
       for (String confKey : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) {
         for (String id : HAUtil.getRMHAIds(conf)) {
           conf.set(HAUtil.addSuffix(confKey, id), hostname + ":0");
@@ -306,15 +328,17 @@ public class MiniYARNCluster extends Com
     protected synchronized void serviceInit(Configuration conf)
         throws Exception {
       conf.setBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, true);
-      if (!conf.getBoolean(
-          YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS,
-          YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS)) {
+
+      if (!useFixedPorts) {
         if (HAUtil.isHAEnabled(conf)) {
           setHARMConfiguration(conf);
         } else {
           setNonHARMConfiguration(conf);
         }
       }
+      if (HAUtil.isHAEnabled(conf)) {
+        conf.set(YarnConfiguration.RM_HA_ID, rmIds[index]);
+      }
       resourceManagers[index].init(conf);
       resourceManagers[index].getRMContext().getDispatcher().register
           (RMAppAttemptEventType.class,
@@ -500,7 +524,9 @@ public class MiniYARNCluster extends Com
     protected void doSecureLogin() throws IOException {
       // Don't try to login using keytab in the testcase.
     }
+  }
 
+  private class ShortCircuitedNodeManager extends CustomNodeManager {
     @Override
     protected NodeStatusUpdater createNodeStatusUpdater(Context context,
         Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
@@ -553,4 +579,28 @@ public class MiniYARNCluster extends Com
       };
     }
   }
+
+  /**
+   * Wait for all the NodeManagers to connect to the ResourceManager.
+   *
+   * @param timeout Time to wait (sleeps in 100 ms intervals) in milliseconds.
+   * @return true if all NodeManagers connect to the (Active)
+   * ResourceManager, false otherwise.
+   * @throws YarnException
+   * @throws InterruptedException
+   */
+  public boolean waitForNodeManagersToConnect(long timeout)
+      throws YarnException, InterruptedException {
+    ResourceManager rm = getResourceManager();
+    GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance();
+
+    for (int i = 0; i < timeout / 100; i++) {
+      if (nodeManagers.length == rm.getClientRMService().getClusterMetrics(req)
+          .getClusterMetrics().getNumNodeManagers()) {
+        return true;
+      }
+      Thread.sleep(100);
+    }
+    return false;
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java?rev=1551740&r1=1551739&r2=1551740&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java
Tue Dec 17 22:33:16 2013
@@ -33,6 +33,7 @@ import java.io.IOException;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class TestMiniYARNClusterForHA {
@@ -56,16 +57,7 @@ public class TestMiniYARNClusterForHA {
 
   @Test
   public void testClusterWorks() throws YarnException, InterruptedException {
-    ResourceManager rm = cluster.getResourceManager(0);
-    GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance();
-
-    for (int i = 0; i < 600; i++) {
-      if (1 == rm.getClientRMService().getClusterMetrics(req)
-          .getClusterMetrics().getNumNodeManagers()) {
-        return;
-      }
-      Thread.sleep(100);
-    }
-    fail("NodeManager never registered with the RM");
+    assertTrue("NMs fail to connect to the RM",
+        cluster.waitForNodeManagersToConnect(5000));
   }
 }



Mime
View raw message