flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request: [FLINK-1352] [runtime] Fix buggy registration ...
Date Mon, 26 Jan 2015 11:17:20 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/328#discussion_r23523674
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
---
    @@ -175,62 +175,79 @@ import scala.collection.JavaConverters._
       }
     
       private def tryJobManagerRegistration(): Unit = {
    -    registrationAttempts = 0
    -    import context.dispatcher
    -    registrationScheduler = Some(context.system.scheduler.schedule(
    -      TaskManager.REGISTRATION_DELAY, TaskManager.REGISTRATION_INTERVAL,
    -      self, RegisterAtJobManager))
    +    registrationDuration = 0 seconds
    +
    +    registered = false
    +
    +    context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager)
       }
     
       override def receiveWithLogMessages: Receive = {
         case RegisterAtJobManager => {
    -      registrationAttempts += 1
    +      if(!registered) {
    +        registrationDuration += registrationDelay
    +        // double delay for exponential backoff
    +        registrationDelay *= 2
     
    -      if (registered) {
    -        registrationScheduler.foreach(_.cancel())
    -      }
    -      else if (registrationAttempts <= TaskManager.MAX_REGISTRATION_ATTEMPTS) {
    +        if (registrationDuration > maxRegistrationDuration) {
    +          log.warning("TaskManager could not register at JobManager {} after {}.", jobManagerAkkaURL,
     
    -        log.info("Try to register at master {}. Attempt #{}", jobManagerAkkaURL,
    -          registrationAttempts)
    -        val jobManager = context.actorSelection(jobManagerAkkaURL)
    +            maxRegistrationDuration)
     
    -        jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots)
    -      }
    -      else {
    -        log.error("TaskManager could not register at JobManager.");
    -        self ! PoisonPill
    +          self ! PoisonPill
    +        } else if (!registered) {
    +          log.info(s"Try to register at master ${jobManagerAkkaURL}. ${registrationAttempts}.
" +
    +            s"Attempt")
    +          val jobManager = context.actorSelection(jobManagerAkkaURL)
    +
    +          jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots)
    +
    +          context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager)
    +        }
           }
         }
     
         case AcknowledgeRegistration(id, blobPort) => {
    -      if (!registered) {
    +      if(!registered) {
    +        finishRegistration(id, blobPort)
             registered = true
    -        currentJobManager = sender
    -        instanceID = id
    -
    -        context.watch(currentJobManager)
    -
    -        log.info("TaskManager successfully registered at JobManager {}.",
    -          currentJobManager.path.toString)
    -
    -        setupNetworkEnvironment()
    -        setupLibraryCacheManager(blobPort)
    +      } else {
    +        if (log.isDebugEnabled) {
    +          log.debug("The TaskManager {} is already registered at the JobManager {}, but
received " +
    +            "another AcknowledgeRegistration message.", self.path, currentJobManager.path)
    +        }
    +      }
    +    }
     
    -        heartbeatScheduler = Some(context.system.scheduler.schedule(
    -          TaskManager.HEARTBEAT_INTERVAL, TaskManager.HEARTBEAT_INTERVAL, self, SendHeartbeat))
    +    case AlreadyRegistered(id, blobPort) =>
    +      if(!registered) {
    +        log.warning("The TaskManager {} seems to be already registered at the JobManager
{} even" +
    +          "though it has not yet finished the registration process.", self.path, sender.path)
     
    -        profiler foreach {
    -          _.tell(RegisterProfilingListener, JobManager.getProfiler(currentJobManager))
    +        finishRegistration(id, blobPort)
    +        registered = true
    +      } else {
    +        // ignore AlreadyRegistered messages which arrived after AcknowledgeRegistration
    +        if(log.isDebugEnabled){
    +          log.debug("The TaskManager {} has already been registered at the JobManager
{}.",
    +            self.path, sender.path)
             }
    +      }
     
    -        for (listener <- waitForRegistration) {
    -          listener ! RegisteredAtJobManager
    -        }
    +    case RefuseRegistration(reason) =>
    +      if(!registered) {
    +        log.error("The registration of task manager {} was refused by the job manager
{} " +
    +          "because {}.", self.path, jobManagerAkkaURL, reason)
     
    -        waitForRegistration.clear()
    +        // Shut task manager down
    +        self ! PoisonPill
    +      } else {
    +        // ignore RefuseRegistration messages which arrived after AcknowledgeRegistration
    +        if(log.isDebugEnabled) {
    --- End diff --
    
    I was wondering whether this gives us any valuable information for bug-tracking purposes.
You're right that it should not happen too often and thus it won't probably hurt too much.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message