flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3074) Make ApplicationMaster/JobManager akka port configurable
Date Wed, 09 Dec 2015 10:29:10 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15048440#comment-15048440
] 

ASF GitHub Bot commented on FLINK-3074:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1416#discussion_r47073661
  
    --- Diff: flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala
---
    @@ -120,18 +125,76 @@ abstract class ApplicationMasterBase {
             config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0)
           }
     
    -      val (actorSystem, jmActor, archiveActor, webMonitor) =
    +      // we try to start the JobManager actor system using the port definition
    +      // from the config.
    +      // first, we check if the port is available by opening a socket
    +      // if the actor system fails to start on the port, we try further
    +      val amPortRange: String = config.getString(ConfigConstants.YARN_APPLICATION_MASTER_PORT,
    +        ConfigConstants.DEFAULT_YARN_APPLICATION_MASTER_PORT)
    +      val portsIterator = NetUtils.getPortRangeFromString(amPortRange)
    +
    +      // method to start the actor system.
    +      def startActorSystem(portsIterator: java.util.Iterator[Integer]): // return type
-> next line
    +        (ActorSystem, ActorRef, ActorRef, Option[WebMonitor]) = {
    +        val availableSocket = NetUtils.createSocketFromPorts(portsIterator,
    +          new NetUtils.SocketFactory {
    +            override def createSocket(port: Int): ServerSocket = new ServerSocket(port)
    +          })
    +
    +        // get port as integer and close socket
    +       val tryPort = if (availableSocket == null) {
    +          throw new BindException(s"Unable to allocate port for ApplicationMaster in
" +
    +            s"specified port range: $amPortRange ")
    +        } else {
    +          val port = availableSocket.getLocalPort
    +          availableSocket.close()
    +          port // return for if
    +        }
    +
             JobManager.startActorSystemAndJobManagerActors(
               config,
               JobManagerMode.CLUSTER,
               streamingMode,
               ownHostname,
    -          0,
    +          tryPort,
               getJobManagerClass,
               getArchivistClass
             )
    +      }
    +
    +      @tailrec
    +      def retry[T](fn: => T, stopCond: => Boolean): Try[T] = {
    --- End diff --
    
    Very nice and scalaesque solution :+1: 


> Make ApplicationMaster/JobManager akka port configurable
> --------------------------------------------------------
>
>                 Key: FLINK-3074
>                 URL: https://issues.apache.org/jira/browse/FLINK-3074
>             Project: Flink
>          Issue Type: Improvement
>          Components: YARN Client
>            Reporter: Robert Metzger
>            Assignee: Robert Metzger
>             Fix For: 1.0.0
>
>
> Similar to the BlobServer, the YARN ApplicationMaster should allow starting it on a specified
list or range of ports.
> In cases where only certain ports are allowed by a firewall, users can specify a range
of ports where they want the AM to allocate its RPC port



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message