flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [11/16] flink git commit: [FLINK-5918] [runtime] port range support for taskmanager.rpc.port
Date Sat, 01 Jul 2017 10:06:44 GMT
[FLINK-5918] [runtime] port range support for taskmanager.rpc.port

This closes #3416.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3f0ac26e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3f0ac26e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3f0ac26e

Branch: refs/heads/master
Commit: 3f0ac26e9f502f9e032af0375d52c5e4af2126f3
Parents: c84a828
Author: fengyelei <fengyelei@huawei.com>
Authored: Mon Feb 27 09:51:22 2017 +0800
Committer: zentol <chesnay@apache.org>
Committed: Sat Jul 1 10:04:05 2017 +0200

----------------------------------------------------------------------
 docs/setup/config.md                            |  2 +-
 .../flink/configuration/ConfigConstants.java    |  7 +-
 .../flink/configuration/TaskManagerOptions.java |  8 ++
 .../java/org/apache/flink/util/NetUtils.java    | 13 +++
 .../apache/flink/runtime/akka/AkkaUtils.scala   | 51 +++++++++++
 .../flink/runtime/jobmanager/JobManager.scala   | 54 +-----------
 .../runtime/minicluster/FlinkMiniCluster.scala  | 13 ++-
 .../minicluster/LocalFlinkMiniCluster.scala     | 14 +--
 .../flink/runtime/taskmanager/TaskManager.scala | 89 +++++++++++++++++---
 .../TaskManagerConfigurationTest.java           | 33 +++++---
 10 files changed, 196 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3f0ac26e/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 8a6f67d..1511f34 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -247,7 +247,7 @@ The following parameters configure Flink's JobManager and TaskManagers.
 
 - `taskmanager.hostname`: The hostname of the network interface that the TaskManager binds
to. By default, the TaskManager searches for network interfaces that can connect to the JobManager
and other TaskManagers. This option can be used to define a hostname if that strategy fails
for some reason. Because different TaskManagers need different values for this option, it
usually is specified in an additional non-shared TaskManager-specific config file.
 
-- `taskmanager.rpc.port`: The task manager's IPC port (DEFAULT: **0**, which lets the OS
choose a free port).
+- `taskmanager.rpc.port`: The task manager's IPC port (DEFAULT: **0**, which lets the OS
choose a free port). Flink also accepts a list of ports ("50100,50101"), ranges ("50100-50200")
or a combination of both. It is recommended to set a range of ports to avoid collisions when
multiple TaskManagers are running on the same machine.
 
 - `taskmanager.data.port`: The task manager's port used for data exchange operations (DEFAULT:
**0**, which lets the OS choose a free port).
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0ac26e/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index d467dfa..a7a883f 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -185,8 +185,9 @@ public final class ConfigConstants {
 	public static final String TASK_MANAGER_HOSTNAME_KEY = "taskmanager.hostname";
 
 	/**
-	 * The config parameter defining the task manager's IPC port from the configuration.
+	 * @deprecated use {@link TaskManagerOptions#RPC_PORT} instead
 	 */
+	@Deprecated
 	public static final String TASK_MANAGER_IPC_PORT_KEY = "taskmanager.rpc.port";
 
 	/**
@@ -1243,9 +1244,9 @@ public final class ConfigConstants {
 	public static final String DEFAULT_BLOB_SERVER_PORT = "0";
 
 	/**
-	 * The default network port the task manager expects incoming IPC connections. The {@code
0} means that
-	 * the TaskManager searches for a free port.
+	 * @deprecated use {@link TaskManagerOptions#RPC_PORT} instead
 	 */
+	@Deprecated
 	public static final int DEFAULT_TASK_MANAGER_IPC_PORT = 0;
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0ac26e/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index bde564a..fef0975 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -57,6 +57,14 @@ public class TaskManagerOptions {
 			key("taskmanager.exit-on-fatal-akka-error")
 			.defaultValue(false);
 
+	/**
+	 * The default network port range the task manager expects incoming IPC connections. The
{@code "0"} means that
+	 * the TaskManager searches for a free port.
+	 */
+	public static final ConfigOption<String> RPC_PORT = 
+		key("taskmanager.rpc.port")
+			.defaultValue("0");
+
 	// ------------------------------------------------------------------------
 	//  Managed Memory Options
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0ac26e/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
index d4437e4..f56b452 100644
--- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
@@ -319,11 +319,24 @@ public class NetUtils {
 			int dashIdx = range.indexOf('-');
 			if (dashIdx == -1) {
 				// only one port in range:
+				final int port = Integer.valueOf(range);
+				if (port < 0 || port > 65535) {
+					throw new IllegalConfigurationException("Invalid port configuration. Port must be between
0" +
+						"and 65535, but was " + port + ".");
+				}
 				rangeIterator = Collections.singleton(Integer.valueOf(range)).iterator();
 			} else {
 				// evaluate range
 				final int start = Integer.valueOf(range.substring(0, dashIdx));
+				if (start < 0 || start > 65535) {
+					throw new IllegalConfigurationException("Invalid port configuration. Port must be between
0" +
+						"and 65535, but was " + start + ".");
+				}
 				final int end = Integer.valueOf(range.substring(dashIdx+1, range.length()));
+				if (end < 0 || end > 65535) {
+					throw new IllegalConfigurationException("Invalid port configuration. Port must be between
0" +
+						"and 65535, but was " + end + ".");
+				}
 				rangeIterator = new Iterator<Integer>() {
 					int i = start;
 					@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0ac26e/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 2f8445a..b74a9a3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -29,9 +29,11 @@ import org.apache.flink.api.common.time.Time
 import org.apache.flink.configuration.{AkkaOptions, Configuration, SecurityOptions}
 import org.apache.flink.runtime.net.SSLUtils
 import org.apache.flink.util.NetUtils
+import org.jboss.netty.channel.ChannelException
 import org.jboss.netty.logging.{InternalLoggerFactory, Slf4JLoggerFactory}
 import org.slf4j.LoggerFactory
 
+import scala.annotation.tailrec
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.language.postfixOps
@@ -682,5 +684,54 @@ object AkkaUtils {
   def getLocalAkkaURL(actorName: String): String = {
     "akka://flink/user/" + actorName
   }
+
+  /**
+    * Retries a function if it fails because of a [[java.net.BindException]].
+    *
+    * @param fn The function to retry
+    * @param stopCond Flag to signal termination
+    * @param maxSleepBetweenRetries Max random sleep time between retries
+    * @tparam T Return type of the the function to retry
+    * @return Return value of the the function to retry
+    */
+  @tailrec
+  def retryOnBindException[T](
+      fn: => T,
+      stopCond: => Boolean,
+      maxSleepBetweenRetries : Long = 0 )
+    : scala.util.Try[T] = {
+
+    def sleepBeforeRetry() : Unit = {
+      if (maxSleepBetweenRetries > 0) {
+        val sleepTime = (Math.random() * maxSleepBetweenRetries).asInstanceOf[Long]
+        LOG.info(s"Retrying after bind exception. Sleeping for $sleepTime ms.")
+        Thread.sleep(sleepTime)
+      }
+    }
+
+    scala.util.Try {
+      fn
+    } match {
+      case scala.util.Failure(x: BindException) =>
+        if (stopCond) {
+          scala.util.Failure(x)
+        } else {
+          sleepBeforeRetry()
+          retryOnBindException(fn, stopCond)
+        }
+      case scala.util.Failure(x: Exception) => x.getCause match {
+        case c: ChannelException =>
+          if (stopCond) {
+            scala.util.Failure(new RuntimeException(
+              "Unable to do further retries starting the actor system"))
+          } else {
+            sleepBeforeRetry()
+            retryOnBindException(fn, stopCond)
+          }
+        case _ => scala.util.Failure(x)
+      }
+      case f => f
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0ac26e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 071dd02..100199b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -86,9 +86,7 @@ import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
 import org.apache.flink.util.{ConfigurationUtil, InstantiationUtil, NetUtils}
-import org.jboss.netty.channel.ChannelException
 
-import scala.annotation.tailrec
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.concurrent._
@@ -2114,7 +2112,7 @@ object JobManager {
       listeningPortRange: java.util.Iterator[Integer])
     : Unit = {
 
-    val result = retryOnBindException({
+    val result = AkkaUtils.retryOnBindException({
       // Try all ports in the range until successful
       val socket = NetUtils.createSocketFromPorts(
         listeningPortRange,
@@ -2146,56 +2144,6 @@ object JobManager {
   }
 
   /**
-    * Retries a function if it fails because of a [[java.net.BindException]].
-    *
-    * @param fn The function to retry
-    * @param stopCond Flag to signal termination
-    * @param maxSleepBetweenRetries Max random sleep time between retries
-    * @tparam T Return type of the the function to retry
-    * @return Return value of the the function to retry
-    */
-  @tailrec
-  def retryOnBindException[T](
-      fn: => T,
-      stopCond: => Boolean,
-      maxSleepBetweenRetries : Long = 0 )
-    : scala.util.Try[T] = {
-
-    def sleepBeforeRetry() : Unit = {
-      if (maxSleepBetweenRetries > 0) {
-        val sleepTime = (Math.random() * maxSleepBetweenRetries).asInstanceOf[Long]
-        LOG.info(s"Retrying after bind exception. Sleeping for $sleepTime ms.")
-        Thread.sleep(sleepTime)
-      }
-    }
-
-    scala.util.Try {
-      fn
-    } match {
-      case scala.util.Failure(x: BindException) =>
-        if (stopCond) {
-          scala.util.Failure(new RuntimeException(
-            "Unable to do further retries starting the actor system"))
-        } else {
-          sleepBeforeRetry()
-          retryOnBindException(fn, stopCond)
-        }
-      case scala.util.Failure(x: Exception) => x.getCause match {
-        case c: ChannelException =>
-          if (stopCond) {
-            scala.util.Failure(new RuntimeException(
-              "Unable to do further retries starting the actor system"))
-          } else {
-            sleepBeforeRetry()
-            retryOnBindException(fn, stopCond)
-          }
-        case _ => scala.util.Failure(x)
-      }
-      case f => f
-    }
-  }
-
-  /**
     * Starts the JobManager actor system.
     *
     * @param configuration Configuration to use for the job manager actor system

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0ac26e/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index d66d106..a829059 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -27,7 +27,7 @@ import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
 import com.typesafe.config.Config
 import org.apache.flink.api.common.{JobExecutionResult, JobID, JobSubmissionResult}
-import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, JobManagerOptions}
+import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, JobManagerOptions,
TaskManagerOptions}
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener,
Leader
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
 import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware}
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
+import org.apache.flink.util.NetUtils
 import org.slf4j.LoggerFactory
 
 import scala.concurrent.duration.{Duration, FiniteDuration}
@@ -250,10 +251,14 @@ abstract class FlinkMiniCluster(
   }
 
   def getTaskManagerAkkaConfig(index: Int): Config = {
-    val port = originalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
-                                                ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT)
+    val portRange = originalConfiguration.getString(TaskManagerOptions.RPC_PORT)
 
-    val resolvedPort = if(port != 0) port + index else port
+    val portRangeIterator = NetUtils.getPortRangeFromString(portRange)
+
+    val resolvedPort = if (portRangeIterator.hasNext) {
+      val port = portRangeIterator.next()
+      if (port > 0) port + index else 0
+    } else 0
 
     AkkaUtils.getAkkaConfig(originalConfiguration, Some((hostname, resolvedPort)))
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0ac26e/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index a535388..a3e1c78 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -49,6 +49,7 @@ import org.apache.flink.runtime.metrics.MetricRegistry
 import org.apache.flink.runtime.taskexecutor.{TaskExecutor, TaskManagerConfiguration, TaskManagerServices,
TaskManagerServicesConfiguration}
 import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 import org.apache.flink.runtime.util.EnvironmentInformation
+import org.apache.flink.util.NetUtils
 
 import scala.concurrent.{Await, ExecutionContext}
 import scala.concurrent.duration.FiniteDuration
@@ -203,16 +204,19 @@ class LocalFlinkMiniCluster(
   override def startTaskManager(index: Int, system: ActorSystem): ActorRef = {
     val config = originalConfiguration.clone()
 
-    val rpcPort = config.getInteger(
-      ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT)
+    val rpcPortRange = config.getString(TaskManagerOptions.RPC_PORT)
+
+    val rpcPortIterator = NetUtils.getPortRangeFromString(rpcPortRange)
 
     val dataPort = config.getInteger(
       ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
       ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT)
 
-    if (rpcPort > 0) {
-      config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort + index)
+    if (rpcPortIterator.hasNext) {
+      val rpcPort = rpcPortIterator.next()
+      if (rpcPort > 0) {
+        config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort + index)
+      }
     }
     if (dataPort > 0) {
       config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + index)

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0ac26e/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7684a6b..0c419eb 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.taskmanager
 
 import java.io.{File, FileInputStream, IOException}
 import java.lang.management.ManagementFactory
-import java.net.{InetAddress, InetSocketAddress}
+import java.net.{BindException, InetAddress, InetSocketAddress, ServerSocket}
 import java.util
 import java.util.concurrent.{Callable, TimeUnit}
 import java.util.{Collections, UUID}
@@ -70,6 +70,7 @@ import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration
 import org.apache.flink.runtime.taskexecutor.{TaskExecutor, TaskManagerConfiguration, TaskManagerServices,
TaskManagerServicesConfiguration}
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
+import org.apache.flink.util.NetUtils
 
 import scala.collection.JavaConverters._
 import scala.concurrent._
@@ -1665,7 +1666,7 @@ object TaskManager {
       Executors.directExecutor(),
       AddressResolution.TRY_ADDRESS_RESOLUTION)
 
-    val (taskManagerHostname, actorSystemPort) = selectNetworkInterfaceAndPort(
+    val (taskManagerHostname, actorSystemPortRange) = selectNetworkInterfaceAndPortRange(
       configuration,
       highAvailabilityServices)
 
@@ -1673,7 +1674,7 @@ object TaskManager {
       runTaskManager(
         taskManagerHostname,
         resourceID,
-        actorSystemPort,
+        actorSystemPortRange,
         configuration,
         highAvailabilityServices,
         taskManagerClass)
@@ -1688,10 +1689,10 @@ object TaskManager {
 
   @throws(classOf[IOException])
   @throws(classOf[IllegalConfigurationException])
-  def selectNetworkInterfaceAndPort(
+  def selectNetworkInterfaceAndPortRange(
       configuration: Configuration,
       highAvailabilityServices: HighAvailabilityServices)
-    : (String, Int) = {
+    : (String, java.util.Iterator[Integer]) = {
 
     var taskManagerHostname = configuration.getString(
       ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null)
@@ -1713,15 +1714,20 @@ object TaskManager {
     }
 
     // if no task manager port has been configured, use 0 (system will pick any free port)
-    val actorSystemPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
0)
-    if (actorSystemPort < 0 || actorSystemPort > 65535) {
-      throw new IllegalConfigurationException("Invalid value for '" +
-        ConfigConstants.TASK_MANAGER_IPC_PORT_KEY +
-        "' (port for the TaskManager actor system) : " + actorSystemPort +
-        " - Leave config parameter empty or use 0 to let the system choose a port automatically.")
+    val portRange = configuration.getString(TaskManagerOptions.RPC_PORT)
+
+    val portRangeIterator = try {
+      NetUtils.getPortRangeFromString(portRange)
+    } catch {
+      case _: NumberFormatException =>
+        throw new IllegalConfigurationException("Invalid value for '" +
+          TaskManagerOptions.RPC_PORT.key() +
+          "' (port for the TaskManager actor system) : " + portRange +
+          " - Leave config parameter empty or use 0 to let the system choose a port automatically.")
     }
 
-    (taskManagerHostname, actorSystemPort)
+    (taskManagerHostname, portRangeIterator)
+
   }
 
   /**
@@ -1876,6 +1882,65 @@ object TaskManager {
   }
 
   /**
+    * Starts and runs the TaskManager. with all its components trying to bind to
+    * a port in the specified range.
+    *
+    * @param taskManagerHostname The hostname/address of the interface where the actor system
+    *                         will communicate.
+    * @param resourceID The id of the resource which the task manager will run on.
+    * @param actorSystemPortRange The port at which the actor system will communicate.
+    * @param configuration The configuration for the TaskManager.
+    * @param taskManagerClass The actor class to instantiate. Allows the use of TaskManager
+    *                         subclasses for example for YARN.
+    */
+  @throws(classOf[Exception])
+  def runTaskManager(
+    taskManagerHostname: String,
+    resourceID: ResourceID,
+    actorSystemPortRange: java.util.Iterator[Integer],
+    configuration: Configuration,
+    highAvailabilityServices: HighAvailabilityServices,
+    taskManagerClass: Class[_ <: TaskManager])
+    : Unit = {
+
+    val result = AkkaUtils.retryOnBindException({
+      // Try all ports in the range until successful
+      val socket = NetUtils.createSocketFromPorts(
+        actorSystemPortRange,
+        new NetUtils.SocketFactory {
+          override def createSocket(port: Int): ServerSocket = new ServerSocket(
+            // Use the correct listening address, bound ports will only be
+            // detected later by Akka.
+            port, 0, InetAddress.getByName(NetUtils.getWildcardIPAddress))
+        })
+
+      val port =
+        if (socket == null) {
+          throw new BindException(s"Unable to allocate port for TaskManager.")
+        } else {
+          try {
+            socket.getLocalPort()
+          } finally {
+            socket.close()
+          }
+        }
+
+      runTaskManager(
+        taskManagerHostname,
+        resourceID,
+        port,
+        configuration,
+        highAvailabilityServices,
+        taskManagerClass)
+    }, { !actorSystemPortRange.hasNext }, 5000)
+
+    result match {
+      case scala.util.Failure(f) => throw f
+      case _ =>
+    }
+  }
+
+  /**
    * Starts the task manager actor.
    *
    * @param configuration The configuration for the TaskManager.

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0ac26e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
index a760760..69cadfb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.concurrent.Executors;
@@ -36,6 +37,7 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.lang.reflect.Field;
 import java.net.*;
+import java.util.Iterator;
 import java.util.UUID;
 
 import static org.junit.Assert.*;
@@ -62,8 +64,7 @@ public class TaskManagerConfigurationTest {
 
 		try {
 
-
-			Tuple2<String, Object> address = TaskManager.selectNetworkInterfaceAndPort(config,
highAvailabilityServices);
+			Tuple2<String, Iterator<Integer>> address = TaskManager.selectNetworkInterfaceAndPortRange(config,
highAvailabilityServices);
 
 			// validate the configured test host name
 			assertEquals(TEST_HOST_NAME, address._1());
@@ -91,17 +92,29 @@ public class TaskManagerConfigurationTest {
 
 		try {
 			// auto port
-			assertEquals(0, TaskManager.selectNetworkInterfaceAndPort(config, highAvailabilityServices)._2());
+			Iterator<Integer> portsIter = TaskManager.selectNetworkInterfaceAndPortRange(config,
highAvailabilityServices)._2();
+			assertTrue(portsIter.hasNext());
+			assertEquals(0, (int) portsIter.next());
 
 			// pre-defined port
 			final int testPort = 22551;
-			config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, testPort);
-			assertEquals(testPort, TaskManager.selectNetworkInterfaceAndPort(config, highAvailabilityServices)._2());
+			config.setString(TaskManagerOptions.RPC_PORT, String.valueOf(testPort));
+
+			portsIter = TaskManager.selectNetworkInterfaceAndPortRange(config, highAvailabilityServices)._2();
+			assertTrue(portsIter.hasNext());
+			assertEquals(testPort, (int) portsIter.next());
+
+			// port range
+			config.setString(TaskManagerOptions.RPC_PORT, "8000-8001");
+			portsIter = TaskManager.selectNetworkInterfaceAndPortRange(config, highAvailabilityServices)._2();
+			assertTrue(portsIter.hasNext());
+			assertEquals(8000, (int) portsIter.next());
+			assertEquals(8001, (int) portsIter.next());
 
 			// invalid port
 			try {
-				config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, -1);
-				TaskManager.selectNetworkInterfaceAndPort(config, highAvailabilityServices);
+				config.setString(TaskManagerOptions.RPC_PORT, "-1");
+				TaskManager.selectNetworkInterfaceAndPortRange(config, highAvailabilityServices);
 				fail("should fail with an exception");
 			}
 			catch (IllegalConfigurationException e) {
@@ -110,8 +123,8 @@ public class TaskManagerConfigurationTest {
 
 			// invalid port
 			try {
-				config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 100000);
-				TaskManager.selectNetworkInterfaceAndPort(config, highAvailabilityServices);
+				config.setString(TaskManagerOptions.RPC_PORT, "100000");
+				TaskManager.selectNetworkInterfaceAndPortRange(config, highAvailabilityServices);
 				fail("should fail with an exception");
 			}
 			catch (IllegalConfigurationException e) {
@@ -180,7 +193,7 @@ public class TaskManagerConfigurationTest {
 			HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
 
 		try {
-			assertNotNull(TaskManager.selectNetworkInterfaceAndPort(config, highAvailabilityServices)._1());
+			assertNotNull(TaskManager.selectNetworkInterfaceAndPortRange(config, highAvailabilityServices)._1());
 		}
 		catch (Exception e) {
 			e.printStackTrace();


Mime
View raw message