flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject flink git commit: [FLINK-2722] Use InetAddress.getLocalHost() as first approach when detecting the TMs own ip/hostname
Date Tue, 22 Sep 2015 13:03:29 GMT
Repository: flink
Updated Branches:
  refs/heads/master c24b8e6cb -> a3df109cf


[FLINK-2722] Use InetAddress.getLocalHost() as first approach when detecting the TMs own ip/hostname

This closes #1159


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3df109c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3df109c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3df109c

Branch: refs/heads/master
Commit: a3df109cf7d959dafea8893dcbf3bec8075f3489
Parents: c24b8e6
Author: Robert Metzger <rmetzger@apache.org>
Authored: Fri Sep 4 14:51:40 2015 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Tue Sep 22 15:03:05 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/net/NetUtils.java  | 105 ++++---------------
 .../org/apache/flink/yarn/FlinkYarnCluster.java |   4 +-
 2 files changed, 25 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a3df109c/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
index 46c07fa..2df0616 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
@@ -53,6 +53,8 @@ public class NetUtils {
 	 * There is only a state transition if the current state failed to determine the address.
 	 */
 	private enum AddressDetectionState {
+		/** Connect from interface returned by InetAddress.getLocalHost() **/
+		LOCAL_HOST(50),
 		/** Detect own IP address based on the target IP address. Look for common prefix */
 		ADDRESS(50),
 		/** Try to connect on all Interfaces and all their addresses with a low timeout */
@@ -73,87 +75,6 @@ public class NetUtils {
 		}
 	}
 
-	/**
-	 * Find out the TaskManager's own IP address, simple version.
-	 */
-	public static InetAddress resolveAddress(InetSocketAddress jobManagerAddress) throws IOException
{
-		AddressDetectionState strategy = jobManagerAddress != null ? AddressDetectionState.ADDRESS:
AddressDetectionState.HEURISTIC;
-
-		while (true) {
-			Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
-
-			while (e.hasMoreElements()) {
-				NetworkInterface n = e.nextElement();
-				Enumeration<InetAddress> ee = n.getInetAddresses();
-
-				while (ee.hasMoreElements()) {
-					InetAddress i = ee.nextElement();
-
-					switch (strategy) {
-						case ADDRESS:
-							if (hasCommonPrefix(jobManagerAddress.getAddress().getAddress(), i.getAddress()))
{
-								if (tryToConnect(i, jobManagerAddress, strategy.getTimeout(), true)) {
-									LOG.info("Determined {} as the machine's own IP address", i);
-									return i;
-								}
-							}
-							break;
-
-						case FAST_CONNECT:
-						case SLOW_CONNECT:
-							boolean correct = tryToConnect(i, jobManagerAddress, strategy.getTimeout(), true);
-							if (correct) {
-								LOG.info("Determined {} as the machine's own IP address", i);
-								return i;
-							}
-							break;
-
-						case HEURISTIC:
-							if (LOG.isDebugEnabled()) {
-								LOG.debug("ResolveAddress using heuristic strategy for " + i + " with" +
-										" isLinkLocalAddress:" + i.isLinkLocalAddress() +
-										" isLoopbackAddress:" + i.isLoopbackAddress() + ".");
-							}
-
-							if (!i.isLinkLocalAddress() && !i.isLoopbackAddress() && i instanceof
Inet4Address){
-								LOG.warn("Hostname " + InetAddress.getLocalHost().getHostName() + " resolves to "
+
-										"loopback address. Using instead " + i.getHostAddress() + " on network " +
-										"interface " + n.getName() + ".");
-								return i;
-							}
-							break;
-
-						default:
-							throw new RuntimeException("Unknown address detection strategy: " + strategy);
-					}
-				}
-			}
-			// state control
-			switch (strategy) {
-				case ADDRESS:
-					strategy = AddressDetectionState.FAST_CONNECT;
-					break;
-				case FAST_CONNECT:
-					strategy = AddressDetectionState.SLOW_CONNECT;
-					break;
-				case SLOW_CONNECT:
-					if (!InetAddress.getLocalHost().isLoopbackAddress()) {
-						LOG.info("Heuristically taking " + InetAddress.getLocalHost() + " as own " +
-								"IP address.");
-						return InetAddress.getLocalHost();
-					} else {
-						strategy = AddressDetectionState.HEURISTIC;
-						break;
-					}
-				case HEURISTIC:
-					throw new RuntimeException("Unable to resolve own inet address by connecting " +
-							"to address (" + jobManagerAddress + ").");
-			}
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Defaulting to detection strategy " + strategy);
-			}
-		}
-	}
 
 	/**
 	 * Finds the local network address from which this machine can connect to the target
@@ -191,7 +112,7 @@ public class NetUtils {
 
 		// loop while there is time left
 		while (elapsedTime < maxWaitMillis) {
-			AddressDetectionState strategy = AddressDetectionState.ADDRESS;
+			AddressDetectionState strategy = AddressDetectionState.LOCAL_HOST;
 
 			boolean logging = elapsedTime >= startLoggingAfter;
 			if (logging) {
@@ -206,6 +127,9 @@ public class NetUtils {
 
 				// pick the next strategy
 				switch (strategy) {
+					case LOCAL_HOST:
+						strategy = AddressDetectionState.ADDRESS;
+						break;
 					case ADDRESS:
 						strategy = AddressDetectionState.FAST_CONNECT;
 						break;
@@ -262,6 +186,18 @@ public class NetUtils {
 														InetSocketAddress targetAddress,
 														boolean logging) throws IOException
 	{
+		// try LOCAL_HOST strategy independent of the network interfaces
+		if(strategy == AddressDetectionState.LOCAL_HOST) {
+			InetAddress localhostName = InetAddress.getLocalHost();
+
+			if(tryToConnect(localhostName, targetAddress, strategy.getTimeout(), logging)) {
+				LOG.debug("Using InetAddress.getLocalHost() immediately for the connecting address");
+				return localhostName;
+			} else {
+				return null;
+			}
+		}
+
 		final byte[] targetAddressBytes = targetAddress.getAddress().getAddress();
 
 		// for each network interface
@@ -464,7 +400,7 @@ public class NetUtils {
 					}
 
 					if (targetAddress != null) {
-						AddressDetectionState strategy = AddressDetectionState.ADDRESS;
+						AddressDetectionState strategy = AddressDetectionState.LOCAL_HOST;
 
 						boolean logging = elapsedTime >= startLoggingAfter.toMillis();
 						if (logging) {
@@ -479,6 +415,9 @@ public class NetUtils {
 
 							// pick the next strategy
 							switch (strategy) {
+								case LOCAL_HOST:
+									strategy = AddressDetectionState.ADDRESS;
+									break;
 								case ADDRESS:
 									strategy = AddressDetectionState.FAST_CONNECT;
 									break;

http://git-wip-us.apache.org/repos/asf/flink/blob/a3df109c/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
index 310fb83..57a5010 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
@@ -140,7 +140,9 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 
 		// start actor system
 		LOG.info("Start actor system.");
-		InetAddress ownHostname = NetUtils.resolveAddress(jobManagerAddress); // find name of own
public interface, able to connect to the JM
+		// find name of own public interface, able to connect to the JM
+		// try to find address for 2 seconds. log after 400 ms.
+		InetAddress ownHostname = NetUtils.findConnectingAddress(jobManagerAddress, 2000, 400);
 		actorSystem = AkkaUtils.createActorSystem(flinkConfig,
 				new Some(new Tuple2<String, Integer>(ownHostname.getCanonicalHostName(), 0)));
 


Mime
View raw message