Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 14E0D17EBD for ; Tue, 22 Sep 2015 13:03:33 +0000 (UTC) Received: (qmail 15818 invoked by uid 500); 22 Sep 2015 13:03:30 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 15794 invoked by uid 500); 22 Sep 2015 13:03:29 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 15785 invoked by uid 99); 22 Sep 2015 13:03:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Sep 2015 13:03:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CB656DFF72; Tue, 22 Sep 2015 13:03:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rmetzger@apache.org To: commits@flink.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-2722] Use InetAddress.getLocalHost() as first approach when detecting the TMs own ip/hostname Date: Tue, 22 Sep 2015 13:03:29 +0000 (UTC) Repository: flink Updated Branches: refs/heads/master c24b8e6cb -> a3df109cf [FLINK-2722] Use InetAddress.getLocalHost() as first approach when detecting the TMs own ip/hostname This closes #1159 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3df109c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3df109c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3df109c Branch: refs/heads/master Commit: a3df109cf7d959dafea8893dcbf3bec8075f3489 Parents: c24b8e6 Author: Robert Metzger Authored: Fri Sep 4 14:51:40 2015 +0200 Committer: Robert Metzger Committed: Tue Sep 22 15:03:05 2015 +0200 ---------------------------------------------------------------------- .../org/apache/flink/runtime/net/NetUtils.java | 105 ++++--------------- .../org/apache/flink/yarn/FlinkYarnCluster.java | 4 +- 2 files changed, 25 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a3df109c/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java index 46c07fa..2df0616 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java @@ -53,6 +53,8 @@ public class NetUtils { * There is only a state transition if the current state failed to determine the address. */ private enum AddressDetectionState { + /** Connect from interface returned by InetAddress.getLocalHost() **/ + LOCAL_HOST(50), /** Detect own IP address based on the target IP address. Look for common prefix */ ADDRESS(50), /** Try to connect on all Interfaces and all their addresses with a low timeout */ @@ -73,87 +75,6 @@ public class NetUtils { } } - /** - * Find out the TaskManager's own IP address, simple version. - */ - public static InetAddress resolveAddress(InetSocketAddress jobManagerAddress) throws IOException { - AddressDetectionState strategy = jobManagerAddress != null ? AddressDetectionState.ADDRESS: AddressDetectionState.HEURISTIC; - - while (true) { - Enumeration e = NetworkInterface.getNetworkInterfaces(); - - while (e.hasMoreElements()) { - NetworkInterface n = e.nextElement(); - Enumeration ee = n.getInetAddresses(); - - while (ee.hasMoreElements()) { - InetAddress i = ee.nextElement(); - - switch (strategy) { - case ADDRESS: - if (hasCommonPrefix(jobManagerAddress.getAddress().getAddress(), i.getAddress())) { - if (tryToConnect(i, jobManagerAddress, strategy.getTimeout(), true)) { - LOG.info("Determined {} as the machine's own IP address", i); - return i; - } - } - break; - - case FAST_CONNECT: - case SLOW_CONNECT: - boolean correct = tryToConnect(i, jobManagerAddress, strategy.getTimeout(), true); - if (correct) { - LOG.info("Determined {} as the machine's own IP address", i); - return i; - } - break; - - case HEURISTIC: - if (LOG.isDebugEnabled()) { - LOG.debug("ResolveAddress using heuristic strategy for " + i + " with" + - " isLinkLocalAddress:" + i.isLinkLocalAddress() + - " isLoopbackAddress:" + i.isLoopbackAddress() + "."); - } - - if (!i.isLinkLocalAddress() && !i.isLoopbackAddress() && i instanceof Inet4Address){ - LOG.warn("Hostname " + InetAddress.getLocalHost().getHostName() + " resolves to " + - "loopback address. Using instead " + i.getHostAddress() + " on network " + - "interface " + n.getName() + "."); - return i; - } - break; - - default: - throw new RuntimeException("Unknown address detection strategy: " + strategy); - } - } - } - // state control - switch (strategy) { - case ADDRESS: - strategy = AddressDetectionState.FAST_CONNECT; - break; - case FAST_CONNECT: - strategy = AddressDetectionState.SLOW_CONNECT; - break; - case SLOW_CONNECT: - if (!InetAddress.getLocalHost().isLoopbackAddress()) { - LOG.info("Heuristically taking " + InetAddress.getLocalHost() + " as own " + - "IP address."); - return InetAddress.getLocalHost(); - } else { - strategy = AddressDetectionState.HEURISTIC; - break; - } - case HEURISTIC: - throw new RuntimeException("Unable to resolve own inet address by connecting " + - "to address (" + jobManagerAddress + ")."); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Defaulting to detection strategy " + strategy); - } - } - } /** * Finds the local network address from which this machine can connect to the target @@ -191,7 +112,7 @@ public class NetUtils { // loop while there is time left while (elapsedTime < maxWaitMillis) { - AddressDetectionState strategy = AddressDetectionState.ADDRESS; + AddressDetectionState strategy = AddressDetectionState.LOCAL_HOST; boolean logging = elapsedTime >= startLoggingAfter; if (logging) { @@ -206,6 +127,9 @@ public class NetUtils { // pick the next strategy switch (strategy) { + case LOCAL_HOST: + strategy = AddressDetectionState.ADDRESS; + break; case ADDRESS: strategy = AddressDetectionState.FAST_CONNECT; break; @@ -262,6 +186,18 @@ public class NetUtils { InetSocketAddress targetAddress, boolean logging) throws IOException { + // try LOCAL_HOST strategy independent of the network interfaces + if(strategy == AddressDetectionState.LOCAL_HOST) { + InetAddress localhostName = InetAddress.getLocalHost(); + + if(tryToConnect(localhostName, targetAddress, strategy.getTimeout(), logging)) { + LOG.debug("Using InetAddress.getLocalHost() immediately for the connecting address"); + return localhostName; + } else { + return null; + } + } + final byte[] targetAddressBytes = targetAddress.getAddress().getAddress(); // for each network interface @@ -464,7 +400,7 @@ public class NetUtils { } if (targetAddress != null) { - AddressDetectionState strategy = AddressDetectionState.ADDRESS; + AddressDetectionState strategy = AddressDetectionState.LOCAL_HOST; boolean logging = elapsedTime >= startLoggingAfter.toMillis(); if (logging) { @@ -479,6 +415,9 @@ public class NetUtils { // pick the next strategy switch (strategy) { + case LOCAL_HOST: + strategy = AddressDetectionState.ADDRESS; + break; case ADDRESS: strategy = AddressDetectionState.FAST_CONNECT; break; http://git-wip-us.apache.org/repos/asf/flink/blob/a3df109c/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java index 310fb83..57a5010 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java @@ -140,7 +140,9 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster { // start actor system LOG.info("Start actor system."); - InetAddress ownHostname = NetUtils.resolveAddress(jobManagerAddress); // find name of own public interface, able to connect to the JM + // find name of own public interface, able to connect to the JM + // try to find address for 2 seconds. log after 400 ms. + InetAddress ownHostname = NetUtils.findConnectingAddress(jobManagerAddress, 2000, 400); actorSystem = AkkaUtils.createActorSystem(flinkConfig, new Some(new Tuple2(ownHostname.getCanonicalHostName(), 0)));