flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/3] flink git commit: [FLINK-1559] [akka] Normalize all akka URLs to use IP addresses rather than hostnames
Date Mon, 16 Feb 2015 19:40:28 GMT
[FLINK-1559] [akka] Normalize all akka URLs to use IP addresses rather than hostnames


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2dcff4c1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2dcff4c1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2dcff4c1

Branch: refs/heads/master
Commit: 2dcff4c122f37ca58a485e886fec9295fef0c832
Parents: b941cf2
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Feb 16 19:48:32 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Feb 16 19:50:33 2015 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/akka/AkkaUtils.scala   | 33 ++++++++++++++------
 .../apache/flink/runtime/client/JobClient.scala |  5 +--
 .../flink/runtime/jobmanager/JobManager.scala   | 14 ++-------
 .../runtime/minicluster/FlinkMiniCluster.scala  |  6 +++-
 .../flink/runtime/taskmanager/TaskManager.scala | 20 +++++++-----
 .../src/test/resources/log4j-test.properties    | 14 ++++-----
 .../org/apache/flink/yarn/FlinkYarnCluster.java |  2 +-
 .../apache/flink/yarn/ApplicationClient.scala   |  4 ++-
 .../scala/org/apache/flink/yarn/Messages.scala  |  3 +-
 9 files changed, 58 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2dcff4c1/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 2c9daad..5a1a2be 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.akka
 
 import java.io.IOException
+import java.net.InetAddress
 import java.util.concurrent.{TimeUnit, Callable}
 
 import akka.actor.Actor.Receive
@@ -94,9 +95,12 @@ object AkkaUtils {
     val defaultConfig = getBasicAkkaConfig(configuration)
 
     listeningAddress match {
+
       case Some((hostname, port)) =>
-        val remoteConfig = getRemoteAkkaConfig(configuration, hostname, port)
+        val ipAddress = InetAddress.getByName(hostname).getHostAddress()
+        val remoteConfig = getRemoteAkkaConfig(configuration, ipAddress, port)
         remoteConfig.withFallback(defaultConfig)
+
       case None =>
         defaultConfig
     }
@@ -174,10 +178,12 @@ object AkkaUtils {
    */
   private def getRemoteAkkaConfig(configuration: Configuration,
                                   hostname: String, port: Int): Config = {
-    val akkaAskTimeout = Duration(configuration.getString(ConfigConstants.AKKA_ASK_TIMEOUT,
+    val akkaAskTimeout = Duration(configuration.getString(
+      ConfigConstants.AKKA_ASK_TIMEOUT,
       ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT))
 
-    val startupTimeout = configuration.getString(ConfigConstants.AKKA_STARTUP_TIMEOUT,
+    val startupTimeout = configuration.getString(
+      ConfigConstants.AKKA_STARTUP_TIMEOUT,
       akkaAskTimeout.toString)
 
     val transportHeartbeatInterval = configuration.getString(
@@ -188,25 +194,32 @@ object AkkaUtils {
       ConfigConstants.AKKA_TRANSPORT_HEARTBEAT_PAUSE,
       ConfigConstants.DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE)
 
-    val transportThreshold = configuration.getDouble(ConfigConstants.AKKA_TRANSPORT_THRESHOLD,
+    val transportThreshold = configuration.getDouble(
+      ConfigConstants.AKKA_TRANSPORT_THRESHOLD,
       ConfigConstants.DEFAULT_AKKA_TRANSPORT_THRESHOLD)
 
     val watchHeartbeatInterval = configuration.getString(
-        ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, (akkaAskTimeout/10).toString)
+      ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL,
+      (akkaAskTimeout/10).toString)
 
-    val watchHeartbeatPause = configuration.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE,
+    val watchHeartbeatPause = configuration.getString(
+      ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE,
       akkaAskTimeout.toString)
 
-    val watchThreshold = configuration.getDouble(ConfigConstants.AKKA_WATCH_THRESHOLD,
+    val watchThreshold = configuration.getDouble(
+      ConfigConstants.AKKA_WATCH_THRESHOLD,
       ConfigConstants.DEFAULT_AKKA_WATCH_THRESHOLD)
 
-    val akkaTCPTimeout = configuration.getString(ConfigConstants.AKKA_TCP_TIMEOUT,
+    val akkaTCPTimeout = configuration.getString(
+      ConfigConstants.AKKA_TCP_TIMEOUT,
       akkaAskTimeout.toString)
 
-    val akkaFramesize = configuration.getString(ConfigConstants.AKKA_FRAMESIZE,
+    val akkaFramesize = configuration.getString(
+      ConfigConstants.AKKA_FRAMESIZE,
       ConfigConstants.DEFAULT_AKKA_FRAMESIZE)
 
-    val lifecycleEvents = configuration.getBoolean(ConfigConstants.AKKA_LOG_LIFECYCLE_EVENTS,
+    val lifecycleEvents = configuration.getBoolean(
+      ConfigConstants.AKKA_LOG_LIFECYCLE_EVENTS,
       ConfigConstants.DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS)
 
     val logLifecycleEvents = if (lifecycleEvents) "on" else "off"

http://git-wip-us.apache.org/repos/asf/flink/blob/2dcff4c1/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
index 60aefe4..34df4eb 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.client
 
 import java.io.IOException
-import java.net.InetSocketAddress
+import java.net.{InetAddress, InetSocketAddress}
 
 import akka.actor.Status.Failure
 import akka.actor._
@@ -172,7 +172,8 @@ object JobClient {
           "JobManager address has not been specified in the configuration.")
       }
 
-      JobManager.getRemoteJobManagerAkkaURL(jobManagerAddress + ":" + jobManagerRPCPort)
+      val hostPort = new InetSocketAddress(InetAddress.getByName(jobManagerAddress), jobManagerRPCPort)
+      JobManager.getRemoteJobManagerAkkaURL(hostPort)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2dcff4c1/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 1682443..d522d2d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -849,17 +849,6 @@ object JobManager {
   // --------------------------------------------------------------------------
 
   /**
-   * Builds the akka actor path for the JobManager actor, given the address (host:port)
-   * where the JobManager's actor system runs.
-   *
-   * @param address The address (host:port) of the JobManager's actor system.
-   * @return The akka URL of the JobManager actor.
-   */
-  def getRemoteJobManagerAkkaURL(address: String): String = {
-    s"akka.tcp://flink@$address/user/$JOB_MANAGER_NAME"
-  }
-
-  /**
    * Builds the akka actor path for the JobManager actor, given the socket address
    * where the JobManager's actor system runs.
    *
@@ -867,7 +856,8 @@ object JobManager {
    * @return The akka URL of the JobManager actor.
    */
   def getRemoteJobManagerAkkaURL(address: InetSocketAddress): String = {
-    getRemoteJobManagerAkkaURL(address.getAddress().getHostAddress() + ":" + address.getPort)
+    val hostPort = address.getAddress().getHostAddress() + ":" + address.getPort()
+    s"akka.tcp://flink@$hostPort/user/$JOB_MANAGER_NAME"
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/2dcff4c1/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index dd158cb..8f79003 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.minicluster
 
+import java.net.InetAddress
+
 import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
 import com.typesafe.config.Config
@@ -42,7 +44,9 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration,
                                 val singleActorSystem: Boolean) {
   import FlinkMiniCluster._
 
-  val HOSTNAME = "localhost"
+  // NOTE: THIS MUST BE getByName("localhost"), which is 127.0.0.1 and
+  // not getLocalHost(), which may be 127.0.1.1
+  val HOSTNAME = InetAddress.getByName("localhost").getHostAddress()
 
   implicit val timeout = AkkaUtils.getTimeout(userConfiguration)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2dcff4c1/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 4a0ae72..7b84928 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -821,18 +821,22 @@ object TaskManager {
     val jobManagerURL = if (localAkkaCommunication) {
       // JobManager and TaskManager are in the same ActorSystem -> Use local Akka URL
       JobManager.getLocalJobManagerAkkaURL
-    } else {
-      val jobManagerAddress = configuration.getString(ConfigConstants
-          .JOB_MANAGER_IPC_ADDRESS_KEY, null)
-      val jobManagerRPCPort = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-          ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+    }
+    else {
+      val jobManagerAddress = configuration.getString(
+        ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
+
+      val jobManagerRPCPort = configuration.getInteger(
+        ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+        ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
 
       if (jobManagerAddress == null) {
-        throw new RuntimeException("JobManager address has not been specified in the " +
-          "configuration.")
+        throw new RuntimeException(
+          "JobManager address has not been specified in the configuration.")
       }
 
-      JobManager.getRemoteJobManagerAkkaURL(jobManagerAddress + ":" + jobManagerRPCPort)
+      val hostPort = new InetSocketAddress(InetAddress.getByName(jobManagerAddress), jobManagerRPCPort)
+      JobManager.getRemoteJobManagerAkkaURL(hostPort)
     }
 
     val slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)

http://git-wip-us.apache.org/repos/asf/flink/blob/2dcff4c1/flink-tests/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/log4j-test.properties b/flink-tests/src/test/resources/log4j-test.properties
index 0b686e5..b2d89ff 100644
--- a/flink-tests/src/test/resources/log4j-test.properties
+++ b/flink-tests/src/test/resources/log4j-test.properties
@@ -16,12 +16,12 @@
 # limitations under the License.
 ################################################################################
 
-# Set root logger level to DEBUG and its only appender to A1.
-log4j.rootLogger=OFF, A1
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
 
 # A1 is set to be a ConsoleAppender.
-log4j.appender.A1=org.apache.log4j.ConsoleAppender
-
-# A1 uses PatternLayout.
-log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2dcff4c1/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
index abfd4a9..6fedd9a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
@@ -106,7 +106,7 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 		applicationClient = actorSystem.actorOf(Props.create(ApplicationClient.class));
 
 		// instruct ApplicationClient to start a periodical status polling
-		applicationClient.tell(new Messages.LocalRegisterClient(jobManagerHost + ":" + jobManagerPort),
applicationClient);
+		applicationClient.tell(new Messages.LocalRegisterClient(this.jobManagerAddress), applicationClient);
 
 
 		// add hook to ensure proper shutdown

http://git-wip-us.apache.org/repos/asf/flink/blob/2dcff4c1/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
index b8079ab..684990b 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.yarn
 
+import java.net.InetSocketAddress
+
 import akka.actor._
 import org.apache.flink.configuration.GlobalConfiguration
 import org.apache.flink.runtime.ActorLogMessages
@@ -63,7 +65,7 @@ class ApplicationClient extends Actor with ActorLogMessages with ActorLogging
{
 
   override def receiveWithLogMessages: Receive = {
     // ----------------------------- Registration -> Status updates -> shutdown ----------------
-    case LocalRegisterClient(address: String) =>
+    case LocalRegisterClient(address: InetSocketAddress) =>
       val jmAkkaUrl = JobManager.getRemoteJobManagerAkkaURL(address)
 
       val jobManagerFuture = AkkaUtils.getReference(jmAkkaUrl, system, timeout)

http://git-wip-us.apache.org/repos/asf/flink/blob/2dcff4c1/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala
index 5cdbbff..0ac135d 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/Messages.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn
 
+import java.net.InetSocketAddress
 import java.util.Date
 
 import akka.actor.ActorRef
@@ -41,7 +42,7 @@ object Messages {
   case object CheckForUserCommand
 
   // Client-local messages
-  case class LocalRegisterClient(jobManagerAddress: String)
+  case class LocalRegisterClient(jobManagerAddress: InetSocketAddress)
   case object LocalGetYarnMessage // request new message
   case object LocalGetYarnClusterStatus // request the latest cluster status
 


Mime
View raw message