flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request #6203: [FLINK-9289][rest] Rework JobSubmitHandler to acce...
Date Fri, 22 Jun 2018 13:57:27 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6203#discussion_r197452051
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
---
    @@ -317,43 +315,51 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader
classLoader)
     		// we have to enable queued scheduling because slot will be allocated lazily
     		jobGraph.setAllowQueuedScheduling(true);
     
    -		log.info("Requesting blob server port.");
    -		CompletableFuture<BlobServerPortResponseBody> portFuture = sendRequest(BlobServerPortHeaders.getInstance());
    -
    -		CompletableFuture<JobGraph> jobUploadFuture = portFuture.thenCombine(
    -			getDispatcherAddress(),
    -			(BlobServerPortResponseBody response, String dispatcherAddress) -> {
    -				final int blobServerPort = response.port;
    -				final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort);
    +		CompletableFuture<JobSubmitResponseBody> submissionFuture = CompletableFuture.supplyAsync(
    +			() -> {
    +				log.info("Submitting job graph.");
     
    -				List<Path> userJars = jobGraph.getUserJars();
     				Map<String, DistributedCache.DistributedCacheEntry> userArtifacts = jobGraph.getUserArtifacts();
    -				if (!userJars.isEmpty() || !userArtifacts.isEmpty()) {
    -					try (BlobClient client = new BlobClient(address, flinkConfig)) {
    -						log.info("Uploading jar files.");
    -						ClientUtils.uploadAndSetUserJars(jobGraph, client);
    -						log.info("Uploading jar artifacts.");
    -						ClientUtils.uploadAndSetUserArtifacts(jobGraph, client);
    -					} catch (IOException ioe) {
    -						throw new CompletionException(new FlinkException("Could not upload job files.",
ioe));
    +
    +				List<String> jarFileNames = new ArrayList<>(8);
    +				List<JobSubmitRequestBody.DistributedCacheFile> artifactFileNames = new ArrayList<>(8);
    +				Collection<FileUpload> filesToUpload = new ArrayList<>(8);
    +
    +				// TODO: need configurable location
    +				final String jobGraphFileName;
    +				try {
    +					final java.nio.file.Path tempFile = Files.createTempFile("flink-jobgraph", ".bin");
    +					try (OutputStream fileOut = Files.newOutputStream(tempFile)) {
    +						try (ObjectOutputStream objectOut = new ObjectOutputStream(fileOut)) {
    +							objectOut.writeObject(jobGraph);
    +						}
     					}
    +					filesToUpload.add(new FileUpload(tempFile, RestConstants.CONTENT_TYPE_BINARY));
    +					jobGraphFileName = tempFile.getFileName().toString();
    +				} catch (IOException e) {
    +					throw new RuntimeException("lol", e);
     				}
     
    -				return jobGraph;
    -			});
    -
    -		CompletableFuture<JobSubmitResponseBody> submissionFuture = jobUploadFuture.thenCompose(
    -			(JobGraph jobGraphToSubmit) -> {
    -				log.info("Submitting job graph.");
    +				for (Path jar : jobGraph.getUserJars()) {
    +					jarFileNames.add(jar.getName());
    +					filesToUpload.add(new FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
    +				}
     
    -				try {
    -					return sendRequest(
    -						JobSubmitHeaders.getInstance(),
    -						new JobSubmitRequestBody(jobGraph));
    -				} catch (IOException ioe) {
    -					throw new CompletionException(new FlinkException("Could not create JobSubmitRequestBody.",
ioe));
    +				for (Map.Entry<String, DistributedCache.DistributedCacheEntry> artifacts :
jobGraph.getUserArtifacts().entrySet()) {
    +					artifactFileNames.add(new JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(),
artifacts.getValue().filePath));
    +					filesToUpload.add(new FileUpload(Paths.get(artifacts.getValue().filePath), RestConstants.CONTENT_TYPE_BINARY));
     				}
    -			});
    +
    +				return sendRetriableRequest(
    +					JobSubmitHeaders.getInstance(),
    +					EmptyMessageParameters.getInstance(),
    +					new JobSubmitRequestBody(
    +						jobGraphFileName,
    +						jarFileNames,
    +						artifactFileNames),
    +					filesToUpload,
    +					isConnectionProblemOrServiceUnavailable());
    --- End diff --
    
    I think it would be better to clean up the generated `JobGraph` file after we've sent
the request.


---

Mime
View raw message