flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...
Date Mon, 08 Jan 2018 12:25:22 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5207#discussion_r160127783
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
---
    @@ -104,11 +122,26 @@ protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader
classLoad
     		} catch (JobSubmissionException e) {
     			throw new ProgramInvocationException(e);
     		}
    -		// don't return just a JobSubmissionResult here, the signature is lying
    -		// The CliFrontend expects this to be a JobExecutionResult
     
    -		// TOOD: do not exit this method until job is finished
    -		return new JobExecutionResult(jobGraph.getJobID(), 1, Collections.emptyMap());
    +		final JobExecutionResult jobExecutionResult = waitForJobExecutionResult(jobGraph.getJobID());
    +
    +		if (jobExecutionResult.getSerializedThrowable().isPresent()) {
    +			final SerializedThrowable serializedThrowable = jobExecutionResult.getSerializedThrowable().get();
    +			final Throwable throwable = serializedThrowable.deserializeError(classLoader);
    +			throw new ProgramInvocationException(throwable);
    +		}
    +
    +		try {
    +			// don't return just a JobSubmissionResult here, the signature is lying
    +			// The CliFrontend expects this to be a JobExecutionResult
    +			this.lastJobExecutionResult = new SerializedJobExecutionResult(
    +				jobExecutionResult.getJobId(),
    +				jobExecutionResult.getNetRuntime(),
    +				jobExecutionResult.getAccumulatorResults()).toJobExecutionResult(classLoader);
    --- End diff --
    
    We could also directly use `AccumulatorHelper#deserializeAccumulators`.


---

Mime
View raw message