flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files
Date Mon, 02 Jul 2018 09:41:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529608#comment-16529608
] 

ASF GitHub Bot commented on FLINK-9280:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6203#discussion_r199439130
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
---
    @@ -315,36 +315,58 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader
classLoader)
     		// we have to enable queued scheduling because slot will be allocated lazily
     		jobGraph.setAllowQueuedScheduling(true);
     
    -		log.info("Requesting blob server port.");
    -		CompletableFuture<BlobServerPortResponseBody> portFuture = sendRequest(BlobServerPortHeaders.getInstance());
    +		CompletableFuture<JobSubmitResponseBody> submissionFuture = CompletableFuture.supplyAsync(
    +			() -> {
    +				log.info("Submitting job graph.");
     
    -		CompletableFuture<JobGraph> jobUploadFuture = portFuture.thenCombine(
    -			getDispatcherAddress(),
    -			(BlobServerPortResponseBody response, String dispatcherAddress) -> {
    -				final int blobServerPort = response.port;
    -				final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort);
    +				List<String> jarFileNames = new ArrayList<>(8);
    +				List<JobSubmitRequestBody.DistributedCacheFile> artifactFileNames = new ArrayList<>(8);
    +				Collection<FileUpload> filesToUpload = new ArrayList<>(8);
     
    +				// TODO: need configurable location
    +				final java.nio.file.Path jobGraphFile;
     				try {
    -					ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, flinkConfig));
    -				} catch (Exception e) {
    -					throw new CompletionException(e);
    +					jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin");
    +					try (OutputStream fileOut = Files.newOutputStream(jobGraphFile)) {
    +						try (ObjectOutputStream objectOut = new ObjectOutputStream(fileOut)) {
    +							objectOut.writeObject(jobGraph);
    +						}
    +					}
    +					filesToUpload.add(new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
    +				} catch (IOException e) {
    +					throw new CompletionException("Failed to serialize JobGraph.", e);
     				}
     
    -				return jobGraph;
    -			});
    -
    -		CompletableFuture<JobSubmitResponseBody> submissionFuture = jobUploadFuture.thenCompose(
    -			(JobGraph jobGraphToSubmit) -> {
    -				log.info("Submitting job graph.");
    +				for (Path jar : jobGraph.getUserJars()) {
    +					jarFileNames.add(jar.getName());
    +					filesToUpload.add(new FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
    +				}
     
    -				try {
    -					return sendRequest(
    -						JobSubmitHeaders.getInstance(),
    -						new JobSubmitRequestBody(jobGraph));
    -				} catch (IOException ioe) {
    -					throw new CompletionException(new FlinkException("Could not create JobSubmitRequestBody.",
ioe));
    +				for (Map.Entry<String, DistributedCache.DistributedCacheEntry> artifacts :
jobGraph.getUserArtifacts().entrySet()) {
    +					artifactFileNames.add(new JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(),
new Path(artifacts.getValue().filePath).getName()));
    +					filesToUpload.add(new FileUpload(Paths.get(artifacts.getValue().filePath), RestConstants.CONTENT_TYPE_BINARY));
     				}
    -			});
    +
    +				final CompletableFuture<JobSubmitResponseBody> submitFuture = sendRetriableRequest(
    --- End diff --
    
    This is a slightly more extensive change since the cleanup needs access to the `jobGraphFile`.
I'll see what i can do.


> Extend JobSubmitHandler to accept jar files
> -------------------------------------------
>
>                 Key: FLINK-9280
>                 URL: https://issues.apache.org/jira/browse/FLINK-9280
>             Project: Flink
>          Issue Type: New Feature
>          Components: Job-Submission, REST
>    Affects Versions: 1.5.0
>            Reporter: Chesnay Schepler
>            Assignee: Chesnay Schepler
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob server,
sets the blob keys in the jobgraph, and then uploads this graph to The {{JobSubmitHandler}}
which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the blobserver
before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an optional list
of jar files, that were previously uploaded through the {{JarUploadHandler}}. If present,
the handler would upload these jars to the blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message