Return-Path: X-Original-To: apmail-apex-commits-archive@minotaur.apache.org Delivered-To: apmail-apex-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 91AF418CDA for ; Sun, 11 Oct 2015 16:05:33 +0000 (UTC) Received: (qmail 95965 invoked by uid 500); 11 Oct 2015 16:05:33 -0000 Delivered-To: apmail-apex-commits-archive@apex.apache.org Received: (qmail 95926 invoked by uid 500); 11 Oct 2015 16:05:33 -0000 Mailing-List: contact commits-help@apex.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.incubator.apache.org Delivered-To: mailing list commits@apex.incubator.apache.org Received: (qmail 95917 invoked by uid 99); 11 Oct 2015 16:05:33 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 11 Oct 2015 16:05:33 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id D890C180ECA for ; Sun, 11 Oct 2015 16:05:32 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.771 X-Spam-Level: * X-Spam-Status: No, score=1.771 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id gq32fpLo_loM for ; Sun, 11 Oct 2015 16:05:30 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 37BB0439FB for ; Sun, 11 Oct 2015 16:05:18 +0000 (UTC) Received: (qmail 95203 invoked by uid 99); 11 Oct 2015 16:05:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 11 Oct 2015 16:05:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A790CE0A83; Sun, 11 Oct 2015 16:05:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vrozov@apache.org To: commits@apex.incubator.apache.org Date: Sun, 11 Oct 2015 16:05:37 -0000 Message-Id: In-Reply-To: <05481e5f89f84da8afcd24528691195d@git.apache.org> References: <05481e5f89f84da8afcd24528691195d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [22/29] incubator-apex-core git commit: APEX-181 expose getting of rm web app address APEX-181 expose getting of rm web app address Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/12af6140 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/12af6140 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/12af6140 Branch: refs/heads/feature-module Commit: 12af6140fe89c688b9add8dfe07f27fcd4c8ffa3 Parents: b986f70 Author: David Yan Authored: Mon Oct 5 13:08:03 2015 -0700 Committer: David Yan Committed: Wed Oct 7 16:48:28 2015 -0700 ---------------------------------------------------------------------- .../stram/client/StramClientUtils.java | 45 +++++++++++ .../security/StramWSFilterInitializer.java | 50 ++---------- .../stram/client/StramClientUtilsTest.java | 81 ++++++++++++++++++++ 3 files changed, 134 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/12af6140/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java index d679da4..04a3484 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java +++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java @@ -34,9 +34,11 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.StringEscapeUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -732,4 +734,47 @@ public class StramClientUtils return null; } + public static InetSocketAddress getRMWebAddress(Configuration conf, String rmId) + { + boolean sslEnabled = conf.getBoolean(CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_KEY, CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_DEFAULT); + return getRMWebAddress(conf, sslEnabled, rmId); + } + + public static InetSocketAddress getRMWebAddress(Configuration conf, boolean sslEnabled, String rmId) + { + rmId = (rmId == null) ? "" : ("." + rmId); + InetSocketAddress address; + if (sslEnabled) { + address = conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + rmId, YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS, YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT); + } else { + address = conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_ADDRESS + rmId, YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, YarnConfiguration.DEFAULT_RM_WEBAPP_PORT); + } + LOG.info("rm webapp address setting {}", address); + LOG.debug("rm setting sources {}", conf.getPropertySources(YarnConfiguration.RM_WEBAPP_ADDRESS)); + InetSocketAddress resolvedSocketAddress = NetUtils.getConnectAddress(address); + InetAddress resolved = resolvedSocketAddress.getAddress(); + if (resolved == null || resolved.isAnyLocalAddress() || resolved.isLoopbackAddress()) { + try { + resolvedSocketAddress = InetSocketAddress.createUnresolved(InetAddress.getLocalHost().getCanonicalHostName(), address.getPort()); + } catch (UnknownHostException e) { + //Ignore and fallback. + } + } + return resolvedSocketAddress; + } + + public static String getSocketConnectString(InetSocketAddress socketAddress) + { + String host; + InetAddress address = socketAddress.getAddress(); + if (address == null) { + host = socketAddress.getHostString(); + } else if (address.isAnyLocalAddress() || address.isLoopbackAddress()) { + host = address.getCanonicalHostName(); + } else { + host = address.getHostName(); + } + return host + ":" + socketAddress.getPort(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/12af6140/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java b/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java index a2b2821..e9870c6 100644 --- a/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java +++ b/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java @@ -18,24 +18,21 @@ */ package com.datatorrent.stram.security; -import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.http.FilterContainer; import org.apache.hadoop.http.FilterInitializer; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datatorrent.stram.client.StramClientUtils; import com.datatorrent.stram.util.ConfigUtils; /** @@ -100,51 +97,20 @@ public class StramWSFilterInitializer extends FilterInitializer Replace with methods from Hadoop when HA support is available HttpConfig is not used as it's audience is private as well and it's interface has changed from Hadoop 2.2 to 2.6 */ - public String getResolvedRMWebAppURLWithoutScheme(Configuration conf, String rmId) { - boolean sslEnabled = conf.getBoolean( - CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_KEY, - CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_DEFAULT); - return getResolvedRMWebAppURLWithoutScheme(conf, sslEnabled, (rmId != null) ? "." + rmId : ""); + public String getResolvedRMWebAppURLWithoutScheme(Configuration conf, String rmId) + { + InetSocketAddress socketAddress = StramClientUtils.getRMWebAddress(conf, rmId); + return StramClientUtils.getSocketConnectString(socketAddress); } /* From org.apache.hadoop.yarn.webapp.util.WebAppUtils Modified for HA support */ - public String getResolvedRMWebAppURLWithoutScheme(Configuration conf, boolean sslEnabled, String rmPrpKey) + public String getResolvedRMWebAppURLWithoutScheme(Configuration conf, boolean sslEnabled, String rmId) { - InetSocketAddress address = null; - if (sslEnabled) { - address = - conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + rmPrpKey, - YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS, - YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT); - } else { - address = - conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_ADDRESS + rmPrpKey, - YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, - YarnConfiguration.DEFAULT_RM_WEBAPP_PORT); - } - logger.info("rm webapp address setting {}", address); - logger.debug("rm setting sources {}", conf.getPropertySources(YarnConfiguration.RM_WEBAPP_ADDRESS)); - address = NetUtils.getConnectAddress(address); - StringBuffer sb = new StringBuffer(); - InetAddress resolved = address.getAddress(); - if (resolved == null || resolved.isAnyLocalAddress() || - resolved.isLoopbackAddress()) { - String lh = address.getHostName(); - try { - lh = InetAddress.getLocalHost().getCanonicalHostName(); - } catch (UnknownHostException e) { - //Ignore and fallback. - } - sb.append(lh); - } else { - sb.append(address.getHostName()); - } - sb.append(":").append(address.getPort()); - logger.info("rm webapp resolved hostname {}", sb.toString()); - return sb.toString(); + InetSocketAddress socketAddress = StramClientUtils.getRMWebAddress(conf, sslEnabled, rmId); + return StramClientUtils.getSocketConnectString(socketAddress); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/12af6140/engine/src/test/java/com/datatorrent/stram/client/StramClientUtilsTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/client/StramClientUtilsTest.java b/engine/src/test/java/com/datatorrent/stram/client/StramClientUtilsTest.java new file mode 100644 index 0000000..2392b47 --- /dev/null +++ b/engine/src/test/java/com/datatorrent/stram/client/StramClientUtilsTest.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.stram.client; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import com.datatorrent.stram.util.ConfigUtils; + + +/** + * Unit tests for StramClientUtils + */ +public class StramClientUtilsTest +{ + + private String getHostString(String host) throws UnknownHostException + { + InetAddress address = InetAddress.getByName(host); + if (address.isAnyLocalAddress() || address.isLoopbackAddress()) { + return address.getCanonicalHostName(); + } else { + return address.getHostName(); + } + } + + @Test + public void testRMWebAddress() throws UnknownHostException + { + Configuration conf = new Configuration(false); + + // basic test + conf.setBoolean(CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_KEY, false); + conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "192.168.1.1:8032"); + conf.set(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "192.168.1.2:8032"); + Assert.assertEquals(getHostString("192.168.1.1") + ":8032", StramClientUtils.getSocketConnectString(StramClientUtils.getRMWebAddress(conf, null))); + conf.setBoolean(CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_KEY, true); + Assert.assertEquals(getHostString("192.168.1.2") + ":8032", StramClientUtils.getSocketConnectString(StramClientUtils.getRMWebAddress(conf, null))); + + // set localhost if host is unknown + conf.set(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "someunknownhost:8032"); + + Assert.assertEquals(InetAddress.getLocalHost().getCanonicalHostName() + ":8032", StramClientUtils.getSocketConnectString(StramClientUtils.getRMWebAddress(conf, null))); + + // set localhost + conf.set(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "127.0.0.1:8032"); + Assert.assertEquals(InetAddress.getLocalHost().getCanonicalHostName() + ":8032", StramClientUtils.getSocketConnectString(StramClientUtils.getRMWebAddress(conf, null))); + + // test when HA is enabled + conf.getBoolean(ConfigUtils.RM_HA_ENABLED, true); + conf.set(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + ".rm1", "192.168.1.1:8032"); + conf.set(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + ".rm2", "192.168.1.2:8032"); + Assert.assertEquals(getHostString("192.168.1.1") + ":8032", StramClientUtils.getSocketConnectString(StramClientUtils.getRMWebAddress(conf, "rm1"))); + Assert.assertEquals(getHostString("192.168.1.2") + ":8032", StramClientUtils.getSocketConnectString(StramClientUtils.getRMWebAddress(conf, "rm2"))); + + } + +}