flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Date Fri, 29 Jun 2018 15:37:57 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6203#discussion_r199178551
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
---
    @@ -315,36 +315,58 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader
classLoader)
     		// we have to enable queued scheduling because slot will be allocated lazily
     		jobGraph.setAllowQueuedScheduling(true);
     
    -		log.info("Requesting blob server port.");
    -		CompletableFuture<BlobServerPortResponseBody> portFuture = sendRequest(BlobServerPortHeaders.getInstance());
    +		CompletableFuture<JobSubmitResponseBody> submissionFuture = CompletableFuture.supplyAsync(
    +			() -> {
    +				log.info("Submitting job graph.");
     
    -		CompletableFuture<JobGraph> jobUploadFuture = portFuture.thenCombine(
    -			getDispatcherAddress(),
    -			(BlobServerPortResponseBody response, String dispatcherAddress) -> {
    -				final int blobServerPort = response.port;
    -				final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort);
    +				List<String> jarFileNames = new ArrayList<>(8);
    +				List<JobSubmitRequestBody.DistributedCacheFile> artifactFileNames = new ArrayList<>(8);
    +				Collection<FileUpload> filesToUpload = new ArrayList<>(8);
     
    +				// TODO: need configurable location
    +				final java.nio.file.Path jobGraphFile;
     				try {
    -					ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, flinkConfig));
    -				} catch (Exception e) {
    -					throw new CompletionException(e);
    +					jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin");
    +					try (OutputStream fileOut = Files.newOutputStream(jobGraphFile)) {
    +						try (ObjectOutputStream objectOut = new ObjectOutputStream(fileOut)) {
    --- End diff --
    
    I think we could combine these to `try` statement to `new ObjectOutputStream(Files.newOutputStream(jobGraphFile))`


---

Mime
View raw message