flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Date Fri, 29 Jun 2018 15:38:02 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6203#discussion_r199190292
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
---
    @@ -54,18 +69,89 @@ public JobSubmitHandler(
     
     	@Override
     	protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody,
EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException
{
    -		JobGraph jobGraph;
    -		try {
    -			ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
    -			jobGraph = (JobGraph) objectIn.readObject();
    -		} catch (Exception e) {
    -			throw new RestHandlerException(
    -				"Failed to deserialize JobGraph.",
    -				HttpResponseStatus.BAD_REQUEST,
    -				e);
    +		Collection<Path> uploadedFiles = request.getUploadedFiles();
    +		Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap(
    +			path -> path.getFileName().toString(),
    +			entry -> entry
    +		));
    +
    +		JobSubmitRequestBody requestBody = request.getRequestBody();
    +
    +		Path jobGraphFile = getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph",
nameToFile);
    +
    +		Collection<org.apache.flink.core.fs.Path> jarFiles = new ArrayList<>(requestBody.jarFileNames.size());
    +		for (String jarFileName : requestBody.jarFileNames) {
    +			Path jarFile = getPathAndAssertUpload(jarFileName, "Jar", nameToFile);
    +			jarFiles.add(new org.apache.flink.core.fs.Path(jarFile.toString()));
    +		}
    +
    +		Collection<Tuple2<String, org.apache.flink.core.fs.Path>> artifacts = new
ArrayList<>(requestBody.artifactFileNames.size());
    +		for (JobSubmitRequestBody.DistributedCacheFile artifactFileName : requestBody.artifactFileNames)
{
    +			Path artifactFile = getPathAndAssertUpload(artifactFileName.fileName, "Artifact",
nameToFile);
    +			artifacts.add(Tuple2.of(artifactFileName.entryName, new org.apache.flink.core.fs.Path(artifactFile.toString())));
     		}
     
    -		return gateway.submitJob(jobGraph, timeout)
    -			.thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
    +		Map<String, DistributedCache.DistributedCacheEntry> temporaryHack = artifacts.stream()
    +			.collect(Collectors.toMap(
    +				tuple -> tuple.f0,
    +				// the actual entry definition is mostly irrelevant as only the blobkey is accessed
    +				// blame whoever wrote the ClientUtils API
    +				tuple -> new DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false)
    +			));
    +
    +		// TODO: use executor
    +		CompletableFuture<JobGraph> jobGraphFuture = CompletableFuture.supplyAsync(()
-> {
    +			JobGraph jobGraph;
    +			try (ObjectInputStream objectIn = new ObjectInputStream(Files.newInputStream(jobGraphFile)))
{
    +				jobGraph = (JobGraph) objectIn.readObject();
    +			} catch (Exception e) {
    +				throw new CompletionException(new RestHandlerException(
    +					"Failed to deserialize JobGraph.",
    +					HttpResponseStatus.BAD_REQUEST,
    +					e));
    +			}
    +			return jobGraph;
    +		});
    +
    +		CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout);
    +
    +		CompletableFuture<JobGraph> finalizedJobGraphFuture = jobGraphFuture.thenCombine(blobServerPortFuture,
(jobGraph, blobServerPort) -> {
    +			final InetSocketAddress address = new InetSocketAddress(gateway.getHostname(), blobServerPort);
    +			try (BlobClient blobClient = new BlobClient(address, new Configuration())) {
    +				Collection<PermanentBlobKey> jarBlobKeys = ClientUtils.uploadUserJars(jobGraph.getJobID(),
jarFiles, blobClient);
    +				ClientUtils.setUserJarBlobKeys(jarBlobKeys, jobGraph);
    +
    +				Collection<Tuple2<String, PermanentBlobKey>> artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(),
temporaryHack, blobClient);
    +				ClientUtils.setUserArtifactBlobKeys(jobGraph, artifactBlobKeys);
    +			} catch (IOException e) {
    +				throw new CompletionException(new RestHandlerException(
    +					"Could not upload job files.",
    +					HttpResponseStatus.INTERNAL_SERVER_ERROR,
    +					e));
    +			}
    +			return jobGraph;
    +		});
    +
    +		CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph
-> gateway.submitJob(jobGraph, timeout));
    +
    +		return jobSubmissionFuture.thenCombine(jobGraphFuture,
    +			(ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
    +	}
    --- End diff --
    
    Could we maybe further split this method up into `loadJobGraph`, `uploadJobGraphFiles`
and `submitJobGraph`?


---

Mime
View raw message