flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-1529) Improve JobManager startup robustness
Date Fri, 13 Feb 2015 10:00:22 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14319813#comment-14319813
] 

ASF GitHub Bot commented on FLINK-1529:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/385#discussion_r24653930
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
---
    @@ -530,54 +515,109 @@ Actor with ActorLogMessages with ActorLogging {
             log.error(t, "Could not properly unregister job {} form the library cache.",
jobID)
         }
       }
    -
    -  private def checkJavaVersion(): Unit = {
    -    if (System.getProperty("java.version").substring(0, 3).toDouble < 1.7) {
    -      log.warning("Warning: Flink is running with Java 6. " +
    -        "Java 6 is not maintained any more by Oracle or the OpenJDK community. " +
    -        "Flink currently supports Java 6, but may not in future releases," +
    -        " due to the unavailability of bug fixes security patched.")
    -    }
    -  }
     }
     
     object JobManager {
    +  
       import ExecutionMode._
    +
       val LOG = LoggerFactory.getLogger(classOf[JobManager])
    +
       val FAILURE_RETURN_CODE = 1
    +
       val JOB_MANAGER_NAME = "jobmanager"
       val EVENT_COLLECTOR_NAME = "eventcollector"
       val ARCHIVE_NAME = "archive"
       val PROFILER_NAME = "profiler"
     
       def main(args: Array[String]): Unit = {
    +
    +    // startup checks and logging
         EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager")
    -    val (configuration, executionMode, listeningAddress) = parseArgs(args)
    +    checkJavaVersion()
     
    -      if(SecurityUtils.isSecurityEnabled) {
    +    val (configuration: Configuration,
    +         executionMode: ExecutionMode,
    +         listeningAddress:  Option[(String, Int)]) =
    +    try {
    +      parseArgs(args)
    +    }
    +    catch {
    +      case t: Throwable => {
    +        LOG.error(t.getMessage(), t)
    +        System.exit(FAILURE_RETURN_CODE)
    +        null
    +      }
    +    }
    +
    +    try {
    +      if (SecurityUtils.isSecurityEnabled) {
             LOG.info("Security is enabled. Starting secure JobManager.")
             SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
               override def run(): Unit = {
    -            start(configuration, executionMode, listeningAddress)
    +            runJobManager(configuration, executionMode, listeningAddress)
               }
             })
           } else {
    -        start(configuration, executionMode, listeningAddress)
    +        runJobManager(configuration, executionMode, listeningAddress)
    +      }
    +    }
    +    catch {
    +      case t: Throwable => {
    +        LOG.error("Failed to start JobManager.", t)
    +        System.exit(FAILURE_RETURN_CODE)
           }
    +    }
       }
     
    -  def start(configuration: Configuration, executionMode: ExecutionMode,
    -            listeningAddress : Option[(String, Int)]): Unit = {
    -    val jobManagerSystem = AkkaUtils.createActorSystem(configuration, listeningAddress)
     
    -    startActor(Props(new JobManager(configuration) with WithWebServer))(jobManagerSystem)
    +  def runJobManager(configuration: Configuration,
    +                    executionMode: ExecutionMode,
    +                    listeningAddress: Option[(String, Int)]) : Unit = {
    +
    +    LOG.info("Starting JobManager")
    +    LOG.debug("Starting JobManager actor system")
     
    -    if(executionMode.equals(LOCAL)){
    -      TaskManager.startActorWithConfiguration("", configuration,
    -        localAkkaCommunication = false, localTaskManagerCommunication = true)(jobManagerSystem)
    +    val jobManagerSystem = try {
    +      AkkaUtils.createActorSystem(configuration, listeningAddress)
         }
    +    catch {
    +      case t: Throwable => {
    +        if (t.isInstanceOf[org.jboss.netty.channel.ChannelException]) {
    +          val cause = t.getCause()
    +          if (cause != null && t.getCause().isInstanceOf[java.net.BindException])
{
    +            val address = listeningAddress match {
    +              case Some((host, port)) => host + ":" + port
    +              case None => "unknown"
    +            }
     
    -    jobManagerSystem.awaitTermination()
    +            throw new Exception("Unable to create JobManager at address " + address +
": " + cause.getMessage(), t)
    +          }
    +        }
    +        throw new Exception("Could not create JobManager actor system", t)
    +      }
    +    }
    +
    +    try {
    +      LOG.debug("Starting JobManager actor")
    +
    +      startActor(configuration, jobManagerSystem)
    +
    +      if(executionMode.equals(LOCAL)){
    +        LOG.info("Starting embedded TaskManager for JobManager's LOCAL mode execution")
    +
    +        TaskManager.startActorWithConfiguration("", configuration,
    +          localAkkaCommunication = false, localTaskManagerCommunication = true)(jobManagerSystem)
    +      }
    +
    +      jobManagerSystem.awaitTermination()
    +    }
    +    catch {
    +      case t: Throwable => {
    +        Try(jobManagerSystem.shutdown())
    --- End diff --
    
    I'll log them


> Improve JobManager startup robustness
> -------------------------------------
>
>                 Key: FLINK-1529
>                 URL: https://issues.apache.org/jira/browse/FLINK-1529
>             Project: Flink
>          Issue Type: Improvement
>          Components: JobManager
>    Affects Versions: 0.9
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 0.9
>
>
> Currently, the JobManager is creates asynchronously (as an actor). If its initialization
fails (for various reasons), the process does not terminate and gives only vague log message
that an actor creation failed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message