ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yus...@apache.org
Subject ambari git commit: AMBARI-12910. Ambari Views Framework - need support for accessing RM running under HA. (Dipayan Bhowmick via yusaku)
Date Mon, 31 Aug 2015 21:12:04 GMT
Repository: ambari
Updated Branches:
  refs/heads/branch-2.1 115d93c67 -> a1c510f2b


AMBARI-12910. Ambari Views Framework - need support for accessing RM running under HA. (Dipayan
Bhowmick via yusaku)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/a1c510f2
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/a1c510f2
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/a1c510f2

Branch: refs/heads/branch-2.1
Commit: a1c510f2b6443f3096a7ec39d5ff8a68e186df22
Parents: 115d93c
Author: Yusaku Sako <yusaku@hortonworks.com>
Authored: Mon Aug 31 14:09:36 2015 -0700
Committer: Yusaku Sako <yusaku@hortonworks.com>
Committed: Mon Aug 31 14:11:55 2015 -0700

----------------------------------------------------------------------
 contrib/views/hive/src/main/resources/view.xml  |   2 +-
 .../ambari/view/utils/ambari/Services.java      | 178 +++++++++---
 .../ambari/view/utils/ambari/ServicesTest.java  | 281 +++++++++++++++++++
 3 files changed, 418 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/a1c510f2/contrib/views/hive/src/main/resources/view.xml
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/resources/view.xml b/contrib/views/hive/src/main/resources/view.xml
index ed97213..ce0895b 100644
--- a/contrib/views/hive/src/main/resources/view.xml
+++ b/contrib/views/hive/src/main/resources/view.xml
@@ -181,7 +181,7 @@
 
     <parameter>
         <name>yarn.resourcemanager.url</name>
-        <description>The URL to the YARN ResourceManager, used to provide YARN Application
data.</description>
+        <description>The URL to the YARN ResourceManager, used to provide YARN Application
data. If YARN ResourceManager HA is enabled, provide a comma separated list of URLs for all
the Resource Managers.</description>
         <label>YARN ResourceManager URL</label>
         <placeholder>http://yarn.resourcemanager.address:8088</placeholder>
         <cluster-config>yarn-site/yarn.resourcemanager.webapp.address</cluster-config>

http://git-wip-us.apache.org/repos/asf/ambari/blob/a1c510f2/contrib/views/utils/src/main/java/org/apache/ambari/view/utils/ambari/Services.java
----------------------------------------------------------------------
diff --git a/contrib/views/utils/src/main/java/org/apache/ambari/view/utils/ambari/Services.java
b/contrib/views/utils/src/main/java/org/apache/ambari/view/utils/ambari/Services.java
index 0b49076..120e377 100644
--- a/contrib/views/utils/src/main/java/org/apache/ambari/view/utils/ambari/Services.java
+++ b/contrib/views/utils/src/main/java/org/apache/ambari/view/utils/ambari/Services.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,16 +19,17 @@
 package org.apache.ambari.view.utils.ambari;
 
 import org.apache.ambari.view.ViewContext;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.net.HttpURLConnection;
 import java.util.HashMap;
 import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 /**
  * Utilities for specific Hadoop services and util functions for them
@@ -39,6 +40,12 @@ public class Services {
   public static final String YARN_SITE = "yarn-site";
   public static final String YARN_HTTP_POLICY = "yarn.http.policy";
   public static final String YARN_RESOURCEMANAGER_HA_ENABLED = "yarn.resourcemanager.ha.enabled";
+  private static final String YARN_RESOURCEMANAGER_HTTPS_KEY = "yarn.resourcemanager.webapp.https.address";
+  private static final String YARN_RESOURCEMANAGER_HTTP_KEY = "yarn.resourcemanager.webapp.address";
+  private static final String YARN_RESOURCEMANAGER_HA_RM_IDS_KEY = "yarn.resourcemanager.ha.rm-ids";
+  private static final String YARN_RESOURCEMANAGER_HTTP_HA_PARTIAL_KEY = "yarn.resourcemanager.webapp.address.";
+  private static final String YARN_RESOURCEMANAGER_HTTPS_HA_PARTIAL_KEY = "yarn.resourcemanager.webapp.https.address.";
+  public static final String RM_INFO_API_ENDPOINT = "/ws/v1/cluster/info";
 
   private final AmbariApi ambariApi;
   private ViewContext context;
@@ -60,36 +67,79 @@ public class Services {
     String url;
 
     if (ambariApi.isClusterAssociated()) {
-      String protocol;
+      url = getRMUrlFromClusterConfig();
+    } else {
+      url = getRmUrlFromCustomConfig();
+    }
+    return removeTrailingSlash(url);
+  }
+
+  private String getRMUrlFromClusterConfig() {
+    String url;
+    String protocol;
 
-      String haEnabled = ambariApi.getCluster().getConfigurationValue(YARN_SITE, YARN_RESOURCEMANAGER_HA_ENABLED);
-      String httpPolicy = ambariApi.getCluster().getConfigurationValue(YARN_SITE, YARN_HTTP_POLICY);
+    String haEnabled = ambariApi.getCluster().getConfigurationValue(YARN_SITE, YARN_RESOURCEMANAGER_HA_ENABLED);
+    String httpPolicy = ambariApi.getCluster().getConfigurationValue(YARN_SITE, YARN_HTTP_POLICY);
+
+    if (!(HTTP_ONLY.equals(httpPolicy) || HTTPS_ONLY.equals(httpPolicy))) {
+      LOG.error(String.format("RA030 Unknown value %s of yarn-site/yarn.http.policy. HTTP_ONLY
assumed.", httpPolicy));
+      httpPolicy = HTTP_ONLY;
+    }
+
+    if (haEnabled != null && haEnabled.equals("true")) {
+      String[] urls = getRMHAUrls(httpPolicy);
+      url = getActiveRMUrl(urls);
+    } else {
       if (httpPolicy.equals(HTTPS_ONLY)) {
         protocol = "https";
-        url = ambariApi.getCluster().getConfigurationValue(YARN_SITE, "yarn.resourcemanager.webapp.https.address");
-
+        url = ambariApi.getCluster().getConfigurationValue(YARN_SITE, YARN_RESOURCEMANAGER_HTTPS_KEY);
       } else {
         protocol = "http";
-        url = ambariApi.getCluster().getConfigurationValue(YARN_SITE, "yarn.resourcemanager.webapp.address");
-        if (!httpPolicy.equals(HTTP_ONLY))
-          LOG.error(String.format("RA030 Unknown value %s of yarn-site/yarn.http.policy.
HTTP_ONLY assumed.", httpPolicy));
+        url = ambariApi.getCluster().getConfigurationValue(YARN_SITE, YARN_RESOURCEMANAGER_HTTP_KEY);
       }
-
       url = addProtocolIfMissing(url, protocol);
+    }
+    return url;
+  }
 
-      if (haEnabled != null && haEnabled.equals("true")) {
-        url = getActiveRMUrl(url);
+  private String[] getRMHAUrls(String httpPolicy) {
+    String haRmIds = ambariApi.getCluster().getConfigurationValue(YARN_SITE, YARN_RESOURCEMANAGER_HA_RM_IDS_KEY);
+    String[] ids = haRmIds.split(",");
+    int index = 0;
+    String[] urls = new String[ids.length];
+    for (String id : ids) {
+      String url, protocol;
+      if (HTTPS_ONLY.equals(httpPolicy)) {
+        url = ambariApi.getCluster().getConfigurationValue(YARN_SITE, YARN_RESOURCEMANAGER_HTTPS_HA_PARTIAL_KEY
+ id);
+        protocol = "https";
+      } else {
+        url = ambariApi.getCluster().getConfigurationValue(YARN_SITE, YARN_RESOURCEMANAGER_HTTP_HA_PARTIAL_KEY
+ id);
+        protocol = "http";
       }
-    } else {
-      url = context.getProperties().get("yarn.resourcemanager.url");
-      if (!hasProtocol(url)) {
+
+      urls[index++] = addProtocolIfMissing(url.trim(), protocol);
+    }
+    return urls;
+  }
+
+  private String getRmUrlFromCustomConfig() {
+    // Comma separated list of URLs for HA and single URL for non HA
+    String resourceManagerUrls = context.getProperties().get("yarn.resourcemanager.url");
+    if (!StringUtils.isEmpty(resourceManagerUrls)) {
+      String[] urls = resourceManagerUrls.split(",");
+
+      if (!hasProtocol(urls)) {
         throw new AmbariApiException(
-            "RA070 View is not cluster associated. Resource Manager URL should contain protocol.");
+          "RA070 View is not cluster associated. All Resource Manager URL should contain
protocol.");
       }
+      return getActiveRMUrl(urls);
+    } else {
+      throw new AmbariApiException(
+        "RA070 View is not cluster associated. 'YARN ResourceManager URL' should be provided");
     }
-    return removeTrailingSlash(url);
   }
 
+
   private String removeTrailingSlash(String url) {
     if (url.endsWith("/")) {
       url = url.substring(0, url.length() - 1);
@@ -97,30 +147,60 @@ public class Services {
     return url;
   }
 
-  public final Pattern refreshHeaderUrlPattern = Pattern.compile("^\\d+;\\s*url=(.*)$");
+  /**
+   * Returns active RM URL. All RM Urls for RM HA is passed as an argument. This iterates
over the list of RM hosts
+   * and gets the cluster info. Breaks out and returns the URL when the 'haStatus' parameter
returns "ACTIVE".
+   * If only one url is passed, it is considered as ACTIVE and returned. No API call is made
in that case.
+   * @param urls array of all the RM Urls
+   * @return url of the active RM
+   */
+  private String getActiveRMUrl(String[] urls) {
+    if (urls.length == 1)
+      return urls[0].trim();
+    else {
+      for (String url : urls) {
+        url = url.trim();
+        if (isActiveUrl(url))
+          return url;
+      }
+    }
+    LOG.error("All ResourceManagers are not accessible or none seem to be active.");
+    throw new AmbariApiException("RA110 All ResourceManagers are not accessible or none seem
to be active.");
+  }
 
   /**
-   * Returns active RM URL. Makes a request to RM passed as argument.
-   * If response contains Refresh header then passed url was standby RM.
-   * @param url url of random RM
-   * @return url of active RM
+   * Queries RM API to check the haState.
+   * @param url Resource Manager root url
+   * @return true if haState returned is ACTIVE else false
    */
-  private String getActiveRMUrl(String url) {
-    String activeRMUrl = url;
+
+  private boolean isActiveUrl(String url) {
+    InputStream inputStream = null;
     try {
-      HttpURLConnection httpURLConnection = context.getURLConnectionProvider().
-          getConnection(url, "GET", (String) null, new HashMap<String, String>());
-      String refreshHeader = httpURLConnection.getHeaderField("Refresh");
-      if (refreshHeader != null) { // we hit standby RM
-        Matcher matcher = refreshHeaderUrlPattern.matcher(refreshHeader);
-        if (matcher.find()) {
-          activeRMUrl = matcher.group(1);
-        }
-      }
+      inputStream = context.getURLStreamProvider()
+        .readFrom(url + RM_INFO_API_ENDPOINT, "GET", (String) null, new HashMap<String,
String>());
+      String response = IOUtils.toString(inputStream);
+      String haState = getHAStateFromRMResponse(response);
+
+      if (StringUtils.isNotEmpty(haState) && "ACTIVE".equals(haState))
+        return true;
+
     } catch (IOException e) {
-      throw new AmbariApiException("RA110 ResourceManager is not accessible");
+      LOG.error("Resource Manager : %s is not accessible. This cannot be a active RM. Returning
false.");
+    } finally {
+      if (inputStream != null) {
+        try {
+          inputStream.close();
+        } catch (IOException e) { /* Noting to do */ }
+      }
     }
-    return activeRMUrl;
+    return false;
+  }
+
+  private String getHAStateFromRMResponse(String response) {
+    JSONObject jsonObject = (JSONObject) JSONValue.parse(response);
+    JSONObject clusterInfo = (JSONObject) jsonObject.get("clusterInfo");
+    return (String) clusterInfo.get("haState");
   }
 
   /**
@@ -145,14 +225,14 @@ public class Services {
       host = context.getProperties().get("webhcat.hostname");
       if (host == null || host.isEmpty()) {
         throw new AmbariApiException(
-            "RA080 Can't determine WebHCat hostname neither by associated cluster nor by
webhcat.hostname property.");
+          "RA080 Can't determine WebHCat hostname neither by associated cluster nor by webhcat.hostname
property.");
       }
     }
 
     String port = context.getProperties().get("webhcat.port");
     if (port == null || port.isEmpty()) {
       throw new AmbariApiException(
-          "RA090 Can't determine WebHCat port neither by associated cluster nor by webhcat.port
property.");
+        "RA090 Can't determine WebHCat port neither by associated cluster nor by webhcat.port
property.");
     }
 
     return String.format("http://%s:%s/templeton/v1", host, port);
@@ -165,6 +245,20 @@ public class Services {
     return url;
   }
 
+
+  /**
+   * Checks if all the urls in the array contains protocol
+   * @param urls Array of urls
+   * @return true if all the urls contain protocol
+   */
+  public static boolean hasProtocol(String[] urls) {
+    for (String url : urls) {
+      if (!hasProtocol(url))
+        return false;
+    }
+    return true;
+  }
+
   /**
    * Checks if URL has the protocol
    * @param url url

http://git-wip-us.apache.org/repos/asf/ambari/blob/a1c510f2/contrib/views/utils/src/test/java/org/apache/ambari/view/utils/ambari/ServicesTest.java
----------------------------------------------------------------------
diff --git a/contrib/views/utils/src/test/java/org/apache/ambari/view/utils/ambari/ServicesTest.java
b/contrib/views/utils/src/test/java/org/apache/ambari/view/utils/ambari/ServicesTest.java
new file mode 100644
index 0000000..1950df8
--- /dev/null
+++ b/contrib/views/utils/src/test/java/org/apache/ambari/view/utils/ambari/ServicesTest.java
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.utils.ambari;
+
+
+import org.apache.ambari.view.URLStreamProvider;
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.cluster.Cluster;
+import org.apache.commons.io.IOUtils;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.assertEquals;
+
+public class ServicesTest extends EasyMockSupport {
+
+  private static final String HTTP_RM_URL1 = "http://c1.ambari.apache.org:8088";
+  private static final String HTTP_RM_URL2 = "http://c2.ambari.apache.org:8088";
+  private static final String HTTPS_RM_URL1 = "https://c1.ambari.apache.org:8088";
+  private static final String HTTPS_RM_URL2 = "https://c2.ambari.apache.org:8088";
+  private static final String RM_URL1_HOST_PORT = "c1.ambari.apache.org:8088";
+  private static final String RM_URL2_HOST_PORT = "c2.ambari.apache.org:8088";
+  private static final String RM_INFO_API_ENDPOINT = Services.RM_INFO_API_ENDPOINT;
+
+  @Test(expected = AmbariApiException.class)
+  public void shouldCheckForEmptyYarnRMUrlInCustomConfig() {
+    ViewContext viewContext = getViewContext(new HashMap<String, String>());
+    AmbariApi ambariApi = createNiceMock(AmbariApi.class);
+    expect(ambariApi.isClusterAssociated()).andReturn(false);
+
+    replay(viewContext);
+
+    Services services = new Services(ambariApi, viewContext);
+    services.getRMUrl();
+
+
+  }
+
+  @Test(expected = AmbariApiException.class)
+  public void shouldCheckIfAllRMUrlsHaveProtocolInCustomConfig() {
+    Map<String, String> map = new HashMap<>();
+    map.put("yarn.resourcemanager.url", HTTP_RM_URL1 + "," + RM_URL2_HOST_PORT);
+    ViewContext viewContext = getViewContext(map);
+    AmbariApi ambariApi = createNiceMock(AmbariApi.class);
+    expect(ambariApi.isClusterAssociated()).andReturn(false);
+
+    replay(viewContext);
+
+    Services services = new Services(ambariApi, viewContext);
+    services.getRMUrl();
+  }
+
+  @Test
+  public void shouldReturnUrlIfSingleIsConfiguredInCustomConfig() {
+    Map<String, String> map = new HashMap<>();
+    map.put("yarn.resourcemanager.url", HTTP_RM_URL1);
+    ViewContext viewContext = getViewContext(map);
+    AmbariApi ambariApi = createNiceMock(AmbariApi.class);
+    expect(ambariApi.isClusterAssociated()).andReturn(false);
+
+    replay(viewContext);
+
+    Services services = new Services(ambariApi, viewContext);
+    assertEquals(HTTP_RM_URL1, services.getRMUrl());
+  }
+
+  @Test
+  public void shouldConnectToFirstUrlWhenMultipleRMUrlIsConfiguredInCustomConfig() throws
IOException {
+    Map<String, String> map = new HashMap<>();
+    map.put("yarn.resourcemanager.url", HTTP_RM_URL1 + ", " + HTTP_RM_URL2);
+    ViewContext viewContext = getViewContext(map);
+
+    AmbariApi ambariApi = createNiceMock(AmbariApi.class);
+    URLStreamProvider urlStreamProvider = createNiceMock(URLStreamProvider.class);
+
+    InputStream inputStream = IOUtils.toInputStream("{\"clusterInfo\": {\"haState\": \"ACTIVE\"}}");
+
+    expect(ambariApi.isClusterAssociated()).andReturn(false);
+    expect(viewContext.getURLStreamProvider()).andReturn(urlStreamProvider);
+    expect(urlStreamProvider.readFrom(eq(HTTP_RM_URL1 + RM_INFO_API_ENDPOINT), eq("GET"),
anyString(), EasyMock.<Map<String, String>>anyObject())).andReturn(inputStream);
+
+    replayAll();
+
+    Services services = new Services(ambariApi, viewContext);
+    assertEquals(HTTP_RM_URL1, services.getRMUrl());
+
+  }
+
+  @Test
+  public void shouldConnectToSecondUrlWhenTheFirstURLTimesOut() throws Exception {
+    Map<String, String> map = new HashMap<>();
+    map.put("yarn.resourcemanager.url", HTTP_RM_URL1 + ", " + HTTP_RM_URL2);
+    ViewContext viewContext = getViewContext(map);
+
+    AmbariApi ambariApi = createNiceMock(AmbariApi.class);
+    URLStreamProvider urlStreamProvider = createNiceMock(URLStreamProvider.class);
+
+    InputStream inputStream = IOUtils.toInputStream("{\"clusterInfo\": {\"haState\": \"ACTIVE\"}}");
+
+    expect(ambariApi.isClusterAssociated()).andReturn(false);
+    expect(viewContext.getURLStreamProvider()).andReturn(urlStreamProvider).anyTimes();
+    expect(urlStreamProvider.readFrom(eq(HTTP_RM_URL1 + RM_INFO_API_ENDPOINT), eq("GET"),
anyString(), EasyMock.<Map<String, String>>anyObject())).andThrow(new IOException());
+    expect(urlStreamProvider.readFrom(eq(HTTP_RM_URL2 + RM_INFO_API_ENDPOINT), eq("GET"),
anyString(), EasyMock.<Map<String, String>>anyObject())).andReturn(inputStream);
+
+    replayAll();
+
+    Services services = new Services(ambariApi, viewContext);
+    assertEquals(HTTP_RM_URL2, services.getRMUrl());
+
+  }
+
+  @Test(expected = AmbariApiException.class)
+  public void shouldThrowExceptionWhenAllUrlCannotBeReached() throws Exception {
+    Map<String, String> map = new HashMap<>();
+    map.put("yarn.resourcemanager.url", HTTP_RM_URL1 + ", " + HTTP_RM_URL2);
+    ViewContext viewContext = getViewContext(map);
+
+    AmbariApi ambariApi = createNiceMock(AmbariApi.class);
+    URLStreamProvider urlStreamProvider = createNiceMock(URLStreamProvider.class);
+
+    expect(ambariApi.isClusterAssociated()).andReturn(false);
+    expect(viewContext.getURLStreamProvider()).andReturn(urlStreamProvider).anyTimes();
+    expect(urlStreamProvider.readFrom(eq(HTTP_RM_URL1 + RM_INFO_API_ENDPOINT), eq("GET"),
anyString(), EasyMock.<Map<String, String>>anyObject())).andThrow(new IOException());
+    expect(urlStreamProvider.readFrom(eq(HTTP_RM_URL2 + RM_INFO_API_ENDPOINT), eq("GET"),
anyString(), EasyMock.<Map<String, String>>anyObject())).andThrow(new IOException());
+
+    replayAll();
+
+    Services services = new Services(ambariApi, viewContext);
+    services.getRMUrl();
+  }
+
+  @Test
+  public void shouldReturnActiveRMUrlWhenConnectingToStandby() throws Exception {
+    Map<String, String> map = new HashMap<>();
+    map.put("yarn.resourcemanager.url", HTTP_RM_URL1 + ", " + HTTP_RM_URL2);
+    ViewContext viewContext = getViewContext(map);
+
+    AmbariApi ambariApi = createNiceMock(AmbariApi.class);
+    URLStreamProvider urlStreamProvider = createNiceMock(URLStreamProvider.class);
+
+    InputStream inputStream = IOUtils.toInputStream("{\"clusterInfo\": {\"haState\": \"STANDBY\"}}");
+
+    expect(ambariApi.isClusterAssociated()).andReturn(false);
+    expect(viewContext.getURLStreamProvider()).andReturn(urlStreamProvider).anyTimes();
+    expect(urlStreamProvider.readFrom(eq(HTTP_RM_URL1 + RM_INFO_API_ENDPOINT), eq("GET"),
anyString(), EasyMock.<Map<String, String>>anyObject())).andReturn(inputStream);
+
+    InputStream inputStreamActive = IOUtils.toInputStream("{\"clusterInfo\": {\"haState\":
\"ACTIVE\"}}");
+    expect(urlStreamProvider.readFrom(eq(HTTP_RM_URL2 + RM_INFO_API_ENDPOINT), eq("GET"),
anyString(), EasyMock.<Map<String, String>>anyObject())).andReturn(inputStreamActive);
+
+    replayAll();
+
+    Services services = new Services(ambariApi, viewContext);
+    assertEquals(HTTP_RM_URL2, services.getRMUrl());
+
+    verify(urlStreamProvider);
+
+  }
+
+  @Test
+  public void shouldConnectToRMConfiguredInClusterMode() throws Exception {
+    ViewContext viewContext = getViewContext(new HashMap<String, String>());
+    AmbariApi ambariApi = createNiceMock(AmbariApi.class);
+    Cluster cluster = createNiceMock(Cluster.class);
+
+    expect(ambariApi.isClusterAssociated()).andReturn(true).anyTimes();
+    setClusterExpectation(cluster, "HTTP_ONLY");
+    expect(ambariApi.getCluster()).andReturn(cluster).anyTimes();
+
+    replayAll();
+
+    Services services = new Services(ambariApi, viewContext);
+    assertEquals(HTTP_RM_URL1, services.getRMUrl());
+
+    reset(cluster);
+    setClusterExpectation(cluster, "HTTPS_ONLY");
+    replay(cluster);
+
+    assertEquals(HTTPS_RM_URL2, services.getRMUrl());
+
+    reset(cluster);
+    setClusterExpectation(cluster, "HTTPS_ONLY_XYZ");
+    replay(cluster);
+
+    assertEquals(HTTP_RM_URL1, services.getRMUrl());
+  }
+
+  @Test
+  public void shouldFetchRMUrlsWhileHAEnabledInClusterMode() throws Exception {
+    ViewContext viewContext = getViewContext(new HashMap<String, String>());
+    AmbariApi ambariApi = createNiceMock(AmbariApi.class);
+    Cluster cluster = createNiceMock(Cluster.class);
+    URLStreamProvider urlStreamProvider = createNiceMock(URLStreamProvider.class);
+    Services services = new Services(ambariApi, viewContext);
+
+    InputStream inputStream = IOUtils.toInputStream("{\"clusterInfo\": {\"haState\": \"ACTIVE\"}}");
+
+
+
+    expect(ambariApi.isClusterAssociated()).andReturn(true).anyTimes();
+    setClusterExpectationInHA(cluster, "HTTP_ONLY");
+    expect(ambariApi.getCluster()).andReturn(cluster).anyTimes();
+    expect(viewContext.getURLStreamProvider()).andReturn(urlStreamProvider).anyTimes();
+    expect(urlStreamProvider.readFrom(eq(HTTP_RM_URL1 + RM_INFO_API_ENDPOINT), eq("GET"),
anyString(), EasyMock.<Map<String, String>>anyObject())).andReturn(inputStream);
+
+    replayAll();
+
+    assertEquals(HTTP_RM_URL1, services.getRMUrl());
+
+    reset(cluster, urlStreamProvider);
+    setClusterExpectationInHA(cluster, "HTTP_ONLY");
+    inputStream = IOUtils.toInputStream("{\"clusterInfo\": {\"haState\": \"ACTIVE\"}}");
+    expect(urlStreamProvider.readFrom(eq(HTTP_RM_URL1 + RM_INFO_API_ENDPOINT), eq("GET"),
anyString(), EasyMock.<Map<String, String>>anyObject())).andThrow(new IOException());
+    expect(urlStreamProvider.readFrom(eq(HTTP_RM_URL2 + RM_INFO_API_ENDPOINT), eq("GET"),
anyString(), EasyMock.<Map<String, String>>anyObject())).andReturn(inputStream);
+    replay(cluster, urlStreamProvider);
+
+    assertEquals(HTTP_RM_URL2, services.getRMUrl());
+
+    reset(cluster, urlStreamProvider);
+    setClusterExpectationInHA(cluster, "HTTPS_ONLY");
+    inputStream = IOUtils.toInputStream("{\"clusterInfo\": {\"haState\": \"ACTIVE\"}}");
+    expect(urlStreamProvider.readFrom(eq(HTTPS_RM_URL1 + RM_INFO_API_ENDPOINT), eq("GET"),
anyString(), EasyMock.<Map<String, String>>anyObject())).andReturn(inputStream);
+    replay(cluster, urlStreamProvider);
+
+    assertEquals(HTTPS_RM_URL1, services.getRMUrl());
+
+    reset(cluster, urlStreamProvider);
+    setClusterExpectationInHA(cluster, "HTTPS_ONLY");
+    inputStream = IOUtils.toInputStream("{\"clusterInfo\": {\"haState\": \"ACTIVE\"}}");
+    expect(urlStreamProvider.readFrom(eq(HTTPS_RM_URL1 + RM_INFO_API_ENDPOINT), eq("GET"),
anyString(), EasyMock.<Map<String, String>>anyObject())).andThrow(new IOException());
+    expect(urlStreamProvider.readFrom(eq(HTTPS_RM_URL2 + RM_INFO_API_ENDPOINT), eq("GET"),
anyString(), EasyMock.<Map<String, String>>anyObject())).andReturn(inputStream);
+    replay(cluster, urlStreamProvider);
+
+    assertEquals(HTTPS_RM_URL2, services.getRMUrl());
+  }
+
+  private void setClusterExpectation(Cluster cluster, String httpPolicy) {
+    expect(cluster.getConfigurationValue("yarn-site", "yarn.resourcemanager.ha.enabled")).andReturn("false");
+    expect(cluster.getConfigurationValue("yarn-site", "yarn.http.policy")).andReturn(httpPolicy);
+    expect(cluster.getConfigurationValue("yarn-site", "yarn.resourcemanager.webapp.address")).andReturn(RM_URL1_HOST_PORT);
+    expect(cluster.getConfigurationValue("yarn-site", "yarn.resourcemanager.webapp.https.address")).andReturn(RM_URL2_HOST_PORT);
+  }
+
+  private void setClusterExpectationInHA(Cluster cluster, String httpPolicy) {
+    expect(cluster.getConfigurationValue("yarn-site", "yarn.resourcemanager.ha.enabled")).andReturn("true");
+    expect(cluster.getConfigurationValue("yarn-site", "yarn.http.policy")).andReturn(httpPolicy);
+    expect(cluster.getConfigurationValue("yarn-site", "yarn.resourcemanager.ha.rm-ids")).andReturn("rm1,rm2");
+    expect(cluster.getConfigurationValue("yarn-site", "yarn.resourcemanager.webapp.address.rm1")).andReturn(RM_URL1_HOST_PORT);
+    expect(cluster.getConfigurationValue("yarn-site", "yarn.resourcemanager.webapp.address.rm2")).andReturn(RM_URL2_HOST_PORT);
+    expect(cluster.getConfigurationValue("yarn-site", "yarn.resourcemanager.webapp.https.address.rm1")).andReturn(RM_URL1_HOST_PORT);
+    expect(cluster.getConfigurationValue("yarn-site", "yarn.resourcemanager.webapp.https.address.rm2")).andReturn(RM_URL2_HOST_PORT);
+  }
+
+  private ViewContext getViewContext(Map<String, String> map) {
+    ViewContext viewContextMock = createNiceMock(ViewContext.class);
+    expect(viewContextMock.getProperties()).andReturn(map);
+    return viewContextMock;
+  }
+}
\ No newline at end of file


Mime
View raw message