hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1556380 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ hadoop-yarn/hadoop-yarn-commo...
Date Tue, 07 Jan 2014 22:16:30 GMT
Author: vinodkv
Date: Tue Jan  7 22:16:30 2014
New Revision: 1556380

URL: http://svn.apache.org/r1556380
Log:
YARN-1482. Modified WebApplicationProxy to make it work across ResourceManager fail-over.
Contributed by Xuan Gong.

Added:
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
Removed:
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1556380&r1=1556379&r2=1556380&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Jan  7 22:16:30 2014
@@ -200,6 +200,9 @@ Release 2.4.0 - UNRELEASED
     YARN-1493. Changed ResourceManager and Scheduler interfacing to recognize
     app-attempts separately from apps. (Jian He via vinodkv)
 
+    YARN-1482. Modified WebApplicationProxy to make it work across ResourceManager
+    fail-over. (Xuan Gong via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java?rev=1556380&r1=1556379&r2=1556380&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
(original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
Tue Jan  7 22:16:30 2014
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertTru
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,13 +33,18 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.ha.ClientBaseWithFixes;
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
+import org.apache.hadoop.service.Service.STATE;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
+import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -55,6 +62,8 @@ public class TestRMFailover extends Clie
 
   private Configuration conf;
   private MiniYARNCluster cluster;
+  private ApplicationId fakeAppId;
+
 
   private void setConfForRM(String rmId, String prefix, String value) {
     conf.set(HAUtil.addSuffix(prefix, rmId), value);
@@ -77,6 +86,7 @@ public class TestRMFailover extends Clie
 
   @Before
   public void setup() throws IOException {
+    fakeAppId = ApplicationId.newInstance(System.currentTimeMillis(), 0);
     conf = new YarnConfiguration();
     conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
     conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
@@ -179,4 +189,67 @@ public class TestRMFailover extends Clie
     failover();
     verifyConnections();
   }
+
+  @Test
+  public void testWebAppProxyInStandAloneMode() throws YarnException,
+      InterruptedException, IOException {
+    WebAppProxyServer webAppProxyServer = new WebAppProxyServer();
+    try {
+      conf.set(YarnConfiguration.PROXY_ADDRESS, "0.0.0.0:9099");
+      cluster.init(conf);
+      cluster.start();
+      getAdminService(0).transitionToActive(req);
+      assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
+      verifyConnections();
+      webAppProxyServer.init(conf);
+
+      // Start webAppProxyServer
+      Assert.assertEquals(STATE.INITED, webAppProxyServer.getServiceState());
+      webAppProxyServer.start();
+      Assert.assertEquals(STATE.STARTED, webAppProxyServer.getServiceState());
+
+      URL wrongUrl = new URL("http://0.0.0.0:9099/proxy/" + fakeAppId);
+      HttpURLConnection proxyConn = (HttpURLConnection) wrongUrl
+          .openConnection();
+
+      proxyConn.connect();
+      verifyExpectedException(proxyConn.getResponseMessage());
+
+      explicitFailover();
+      verifyConnections();
+      proxyConn.connect();
+      verifyExpectedException(proxyConn.getResponseMessage());
+    } finally {
+      webAppProxyServer.stop();
+    }
+  }
+
+  @Test
+  public void testEmbeddedWebAppProxy() throws YarnException,
+      InterruptedException, IOException {
+    cluster.init(conf);
+    cluster.start();
+    getAdminService(0).transitionToActive(req);
+    assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
+    verifyConnections();
+    URL wrongUrl = new URL("http://0.0.0.0:18088/proxy/" + fakeAppId);
+    HttpURLConnection proxyConn = (HttpURLConnection) wrongUrl
+        .openConnection();
+
+    proxyConn.connect();
+    verifyExpectedException(proxyConn.getResponseMessage());
+
+    explicitFailover();
+    verifyConnections();
+    proxyConn.connect();
+    verifyExpectedException(proxyConn.getResponseMessage());
+  }
+
+  private void verifyExpectedException(String exceptionMessage){
+    assertTrue(exceptionMessage.contains(ApplicationNotFoundException.class
+        .getName()));
+    assertTrue(exceptionMessage
+        .contains("Application with id '" + fakeAppId + "' " +
+            "doesn't exist in RM."));
+  }
 }

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java?rev=1556380&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
(added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
Tue Jan  7 22:16:30 2014
@@ -0,0 +1,118 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
+
+import com.google.common.base.Preconditions;
+
+public class ClientRMProxy<T> extends RMProxy<T>  {
+  private static final Log LOG = LogFactory.getLog(ClientRMProxy.class);
+  private static final ClientRMProxy INSTANCE = new ClientRMProxy();
+
+  private interface ClientRMProtocols extends ApplicationClientProtocol,
+      ApplicationMasterProtocol, ResourceManagerAdministrationProtocol {
+    // Add nothing
+  }
+
+  private ClientRMProxy(){
+    super();
+  }
+
+  /**
+   * Create a proxy to the ResourceManager for the specified protocol.
+   * @param configuration Configuration with all the required information.
+   * @param protocol Client protocol for which proxy is being requested.
+   * @param <T> Type of proxy.
+   * @return Proxy to the ResourceManager for the specified client protocol.
+   * @throws IOException
+   */
+  public static <T> T createRMProxy(final Configuration configuration,
+      final Class<T> protocol) throws IOException {
+    return createRMProxy(configuration, protocol, INSTANCE);
+  }
+
+  private static void setupTokens(InetSocketAddress resourceManagerAddress)
+      throws IOException {
+    // It is assumed for now that the only AMRMToken in AM's UGI is for this
+    // cluster/RM. TODO: Fix later when we have some kind of cluster-ID as
+    // default service-address, see YARN-986.
+    for (Token<? extends TokenIdentifier> token : UserGroupInformation
+      .getCurrentUser().getTokens()) {
+      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+        // This token needs to be directly provided to the AMs, so set the
+        // appropriate service-name. We'll need more infrastructure when we
+        // need to set it in HA case.
+        SecurityUtil.setTokenService(token, resourceManagerAddress);
+      }
+    }
+  }
+
+  @InterfaceAudience.Private
+  @Override
+  protected InetSocketAddress getRMAddress(YarnConfiguration conf,
+      Class<?> protocol) throws IOException {
+    if (protocol == ApplicationClientProtocol.class) {
+      return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
+          YarnConfiguration.DEFAULT_RM_ADDRESS,
+          YarnConfiguration.DEFAULT_RM_PORT);
+    } else if (protocol == ResourceManagerAdministrationProtocol.class) {
+      return conf.getSocketAddr(
+          YarnConfiguration.RM_ADMIN_ADDRESS,
+          YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
+          YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
+    } else if (protocol == ApplicationMasterProtocol.class) {
+      InetSocketAddress serviceAddr =
+          conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+      setupTokens(serviceAddr);
+      return serviceAddr;
+    } else {
+      String message = "Unsupported protocol found when creating the proxy " +
+          "connection to ResourceManager: " +
+          ((protocol != null) ? protocol.getClass().getName() : "null");
+      LOG.error(message);
+      throw new IllegalStateException(message);
+    }
+  }
+
+  @InterfaceAudience.Private
+  @Override
+  protected void checkAllowedProtocols(Class<?> protocol) {
+    Preconditions.checkArgument(
+        protocol.isAssignableFrom(ClientRMProtocols.class),
+        "RM does not support this client protocol");
+  }
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1556380&r1=1556379&r2=1556380&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
(original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
Tue Jan  7 22:16:30 2014
@@ -147,9 +147,12 @@ public class ResourceManager extends Com
   protected QueueACLsManager queueACLsManager;
   private DelegationTokenRenewer delegationTokenRenewer;
   private WebApp webApp;
+  private AppReportFetcher fetcher = null;
   protected ResourceTrackerService resourceTracker;
   private boolean recoveryEnabled;
 
+  private String webAppAddress;
+
   /** End of Active services */
 
   private Configuration conf;
@@ -194,6 +197,8 @@ public class ResourceManager extends Com
     }
     createAndInitActiveServices();
 
+    webAppAddress = WebAppUtils.getRMWebAppURLWithoutScheme(conf);
+
     super.serviceInit(conf);
   }
   
@@ -437,22 +442,12 @@ public class ResourceManager extends Com
           throw e;
         }
       }
-      startWepApp();
-
-      if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
-        int port = webApp.port();
-        WebAppUtils.setRMWebAppPort(conf, port);
-      }
 
       super.serviceStart();
     }
 
     @Override
     protected void serviceStop() throws Exception {
-      if (webApp != null) {
-        webApp.stop();
-      }
-
 
       DefaultMetricsSystem.shutdown();
 
@@ -752,12 +747,16 @@ public class ResourceManager extends Com
                 YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY)
             .withHttpSpnegoKeytabKey(
                 YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY)
-            .at(WebAppUtils.getRMWebAppURLWithoutScheme(conf)); 
+            .at(webAppAddress);
     String proxyHostAndPort = WebAppUtils.getProxyHostAndPort(conf);
     if(WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf).
         equals(proxyHostAndPort)) {
-      AppReportFetcher fetcher = new AppReportFetcher(conf, getClientRMService());
-      builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME, 
+      if (HAUtil.isHAEnabled(conf)) {
+        fetcher = new AppReportFetcher(conf);
+      } else {
+        fetcher = new AppReportFetcher(conf, getClientRMService());
+      }
+      builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME,
           ProxyUriUtils.PROXY_PATH_SPEC, WebAppProxyServlet.class);
       builder.withAttribute(WebAppProxy.FETCHER_ATTRIBUTE, fetcher);
       String[] proxyParts = proxyHostAndPort.split(":");
@@ -854,6 +853,11 @@ public class ResourceManager extends Com
       transitionToActive();
     }
 
+    startWepApp();
+    if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
+      int port = webApp.port();
+      WebAppUtils.setRMWebAppPort(conf, port);
+    }
     super.serviceStart();
   }
   
@@ -864,6 +868,12 @@ public class ResourceManager extends Com
 
   @Override
   protected void serviceStop() throws Exception {
+    if (webApp != null) {
+      webApp.stop();
+    }
+    if (fetcher != null) {
+      fetcher.stop();
+    }
     super.serviceStop();
     transitionToStandby(false);
     rmContext.setHAServiceState(HAServiceState.STOPPING);

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java?rev=1556380&r1=1556379&r2=1556380&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java
(original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java
Tue Jan  7 22:16:30 2014
@@ -19,21 +19,20 @@
 package org.apache.hadoop.yarn.server.webproxy;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
 
 /**
  * This class abstracts away how ApplicationReports are fetched.
@@ -50,16 +49,12 @@ public class AppReportFetcher {
    */
   public AppReportFetcher(Configuration conf) {
     this.conf = conf;
-    YarnRPC rpc = YarnRPC.create(this.conf);
-    InetSocketAddress rmAddress = conf.getSocketAddr(
-            YarnConfiguration.RM_ADDRESS,
-            YarnConfiguration.DEFAULT_RM_ADDRESS,
-            YarnConfiguration.DEFAULT_RM_PORT);
-    LOG.info("Connecting to ResourceManager at " + rmAddress);
-    applicationsManager =
-        (ApplicationClientProtocol) rpc.getProxy(ApplicationClientProtocol.class,
-            rmAddress, this.conf);
-    LOG.info("Connected to ResourceManager at " + rmAddress);  
+    try {
+      applicationsManager = ClientRMProxy.createRMProxy(conf,
+          ApplicationClientProtocol.class);
+    } catch (IOException e) {
+      throw new YarnRuntimeException(e);
+    }
   }
   
   /**
@@ -91,4 +86,10 @@ public class AppReportFetcher {
         .getApplicationReport(request);
     return response.getApplicationReport();
   }
+
+  public void stop() {
+    if (this.applicationsManager != null) {
+      RPC.stopProxy(this.applicationsManager);
+    }
+  }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java?rev=1556380&r1=1556379&r2=1556380&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java
(original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java
Tue Jan  7 22:16:30 2014
@@ -117,6 +117,9 @@ public class WebAppProxy extends Abstrac
         throw new YarnRuntimeException("Error stopping proxy web server",e);
       }
     }
+    if(this.fetcher != null) {
+      this.fetcher.stop();
+    }
     super.serviceStop();
   }
 



Mime
View raw message