flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4273) Refactor JobClientActor to watch already submitted jobs
Date Thu, 18 Aug 2016 13:20:21 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15426426#comment-15426426
] 

ASF GitHub Bot commented on FLINK-4273:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75305767
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
---
    @@ -58,10 +62,63 @@ class JobInfo(
         }
       }
     
    -  override def toString = s"JobInfo(client: $client ($listeningBehaviour), start: $start)"
    +
    +  /**
    +    * Notifies all clients by sending a message
    +    * @param message the message to send
    +    */
    +  def notifyClients(message: Any) = {
    +    clients foreach {
    +      case (clientActor, _) =>
    +        clientActor ! message
    +    }
    +  }
    +
    +  /**
    +    * Notifies all clients which are not of type detached
    +    * @param message the message to sent to non-detached clients
    +    */
    +  def notifyNonDetachedClients(message: Any) = {
    +    clients foreach {
    +      case (clientActor, ListeningBehaviour.DETACHED) =>
    +        // do nothing
    +      case (clientActor, _) =>
    +        clientActor ! message
    +    }
    +  }
    +
    +  /**
    +    * Sends a message to job clients that match the listening behavior
    +    * @param message the message to send to all clients
    +    * @param listeningBehaviour the desired listening behaviour
    +    */
    +  def notifyClients(message: Any, listeningBehaviour: ListeningBehaviour) = {
    +    clients foreach {
    +      case (clientActor, `listeningBehaviour`) =>
    +        clientActor ! message
    +      case _ =>
    +    }
    +  }
     
       def setLastActive() =
         lastActive = System.currentTimeMillis()
    +
    +
    +  override def toString = s"JobInfo(clients: ${clients.toString()}, start: $start)"
    +
    +  override def equals(other: Any): Boolean = other match {
    +    case that: JobInfo =>
    +      this.isInstanceOf[JobInfo] &&
    --- End diff --
    
    Why do we need this check here?


> Refactor JobClientActor to watch already submitted jobs 
> --------------------------------------------------------
>
>                 Key: FLINK-4273
>                 URL: https://issues.apache.org/jira/browse/FLINK-4273
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Client
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Minor
>             Fix For: 1.2.0
>
>
> The JobClientActor assumes that it receives a job, submits it, and waits for the result.
This process should be broken up into a submission process and a waiting process which can
both be entered independently. This leads to two different entry points:
> 1) submit(job) -> wait
> 2) retrieve(jobID) -> wait



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message