flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Date Fri, 29 Jun 2018 15:37:37 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6203#discussion_r199179398
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
---
    @@ -315,36 +315,58 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader
classLoader)
     		// we have to enable queued scheduling because slot will be allocated lazily
     		jobGraph.setAllowQueuedScheduling(true);
     
    -		log.info("Requesting blob server port.");
    -		CompletableFuture<BlobServerPortResponseBody> portFuture = sendRequest(BlobServerPortHeaders.getInstance());
    +		CompletableFuture<JobSubmitResponseBody> submissionFuture = CompletableFuture.supplyAsync(
    +			() -> {
    +				log.info("Submitting job graph.");
     
    -		CompletableFuture<JobGraph> jobUploadFuture = portFuture.thenCombine(
    -			getDispatcherAddress(),
    -			(BlobServerPortResponseBody response, String dispatcherAddress) -> {
    -				final int blobServerPort = response.port;
    -				final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort);
    +				List<String> jarFileNames = new ArrayList<>(8);
    +				List<JobSubmitRequestBody.DistributedCacheFile> artifactFileNames = new ArrayList<>(8);
    +				Collection<FileUpload> filesToUpload = new ArrayList<>(8);
     
    +				// TODO: need configurable location
    +				final java.nio.file.Path jobGraphFile;
     				try {
    -					ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, flinkConfig));
    -				} catch (Exception e) {
    -					throw new CompletionException(e);
    +					jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin");
    +					try (OutputStream fileOut = Files.newOutputStream(jobGraphFile)) {
    +						try (ObjectOutputStream objectOut = new ObjectOutputStream(fileOut)) {
    +							objectOut.writeObject(jobGraph);
    +						}
    +					}
    +					filesToUpload.add(new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
    +				} catch (IOException e) {
    +					throw new CompletionException("Failed to serialize JobGraph.", e);
    --- End diff --
    
    I think the failure message `"Failed to serialize JobGraph` should go to a dedicated exception
because completion exceptions can be filtered out. `throw new CompletionException(new FlinkException("Failed
to serialize JobGraph.", e))`


---

Mime
View raw message