flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GJL <...@git.apache.org>
Subject [GitHub] flink pull request #5223: [FLINK-8317][flip6] Implement Triggering of Savepo...
Date Fri, 12 Jan 2018 14:28:42 GMT
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5223#discussion_r161234791
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
---
    @@ -185,37 +205,19 @@ private void submitJob(JobGraph jobGraph) throws JobSubmissionException
{
     		}
     	}
     
    -	private JobResult waitForJobExecutionResult(
    -			final JobID jobId) throws ProgramInvocationException {
    -
    -		final JobMessageParameters messageParameters = new JobMessageParameters();
    -		messageParameters.jobPathParameter.resolve(jobId);
    -		JobExecutionResultResponseBody jobExecutionResultResponseBody;
    -		try {
    -			long attempt = 0;
    -			do {
    -				final CompletableFuture<JobExecutionResultResponseBody> responseFuture =
    -					restClient.sendRequest(
    -						restClusterClientConfiguration.getRestServerAddress(),
    -						restClusterClientConfiguration.getRestServerPort(),
    -						JobExecutionResultHeaders.getInstance(),
    -						messageParameters);
    -				jobExecutionResultResponseBody = responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
    -				Thread.sleep(waitStrategy.sleepTime(attempt));
    -				attempt++;
    -			}
    -			while (jobExecutionResultResponseBody.getStatus().getStatusId() != QueueStatus.StatusId.COMPLETED);
    -		} catch (IOException | TimeoutException | ExecutionException e) {
    -			throw new ProgramInvocationException(e);
    -		} catch (InterruptedException e) {
    -			Thread.currentThread().interrupt();
    -			throw new ProgramInvocationException(e);
    +	private <R, T extends AsynchronouslyCreatedResource<R>> R waitForResource(
    +			final SupplierWithException<CompletableFuture<T>, IOException> resourceFutureSupplier)
    +				throws IOException, InterruptedException, ExecutionException, TimeoutException {
    +		T asynchronouslyCreatedResource;
    +		long attempt = 0;
    +		do {
    +			final CompletableFuture<T> responseFuture = resourceFutureSupplier.get();
    +			asynchronouslyCreatedResource = responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
    +			Thread.sleep(waitStrategy.sleepTime(attempt));
    --- End diff --
    
    True


---

Mime
View raw message