flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4273) Refactor JobClientActor to watch already submitted jobs
Date Fri, 19 Aug 2016 13:41:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15428199#comment-15428199
] 

ASF GitHub Bot commented on FLINK-4273:
---------------------------------------

Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75482760
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait(
     			sysoutLogUpdates);
     
     		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    -		
    +
    +		Future<Object> submissionFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.SubmitJobAndWait(jobGraph),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +		return new JobListeningContext(
    +				jobGraph.getJobID(),
    +				submissionFuture,
    +				jobClientActor,
    +				classLoader);
    +	}
    +
    +
    +	/**
    +	 * Attaches to a running Job using the JobID.
    +	 * Reconstructs the user class loader by downloading the jars from the JobManager.
    +	 * @throws JobRetrievalException if anything goes wrong while retrieving the job
    +	 */
    +	public static JobListeningContext attachToRunningJob(
    +			JobID jobID,
    +			ActorGateway jobManagerGateWay,
    +			Configuration configuration,
    +			ActorSystem actorSystem,
    +			LeaderRetrievalService leaderRetrievalService,
    +			FiniteDuration timeout,
    +			boolean sysoutLogUpdates) throws JobRetrievalException {
    +
    +		checkNotNull(jobID, "The jobID must not be null.");
    +		checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null.");
    +		checkNotNull(configuration, "The configuration must not be null.");
    +		checkNotNull(actorSystem, "The actorSystem must not be null.");
    +		checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null.");
    +		checkNotNull(timeout, "The timeout must not be null.");
    +
    +		// retrieve classloader first before doing anything
    +		ClassLoader classloader;
    +		try {
    +			classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout);
    --- End diff --
    
    In addition to the result, you'll also need the class loader for getting accumulators
of a running job. 
    
    I agree that it would be nice to fail when the class loader can't be reconstructed, but
*only* if it is really the only option. So we could start off with the class loader set to
`None` in the `JobListeningContext`. When the class loader is needed, i.e. accumulator retrieval
or job execution result retrieval, it is fetched.


> Refactor JobClientActor to watch already submitted jobs 
> --------------------------------------------------------
>
>                 Key: FLINK-4273
>                 URL: https://issues.apache.org/jira/browse/FLINK-4273
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Client
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Minor
>             Fix For: 1.2.0
>
>
> The JobClientActor assumes that it receives a job, submits it, and waits for the result.
This process should be broken up into a submission process and a waiting process which can
both be entered independently. This leads to two different entry points:
> 1) submit(job) -> wait
> 2) retrieve(jobID) -> wait



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message