flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files
Date Wed, 13 Jun 2018 13:23:01 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511117#comment-16511117
] 

ASF GitHub Bot commented on FLINK-9280:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6147#discussion_r195075314
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
    @@ -197,6 +208,88 @@ public void shutdown(Time timeout) {
     				executor);
     	}
     
    +	public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends
RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(
    +			String targetAddress,
    +			int targetPort,
    +			M messageHeaders,
    +			U messageParameters,
    +			R request,
    +			Collection<Path> jars,
    +			Collection<Path> userArtifacts) throws IOException {
    +		Preconditions.checkNotNull(targetAddress);
    +		Preconditions.checkArgument(0 <= targetPort && targetPort < 65536, "The
target port " + targetPort + " is not in the range (0, 65536].");
    +		Preconditions.checkNotNull(messageHeaders);
    +		Preconditions.checkNotNull(request);
    +		Preconditions.checkNotNull(messageParameters);
    +		Preconditions.checkState(messageParameters.isResolved(), "Message parameters were not
resolved.");
    +
    +		String targetUrl = MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(),
messageParameters);
    +
    +		LOG.debug("Sending request of class {} to {}:{}{}", request.getClass(), targetAddress,
targetPort, targetUrl);
    +		// serialize payload
    +		StringWriter sw = new StringWriter();
    +		objectMapper.writeValue(sw, request);
    +		ByteBuf payload = Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
    +
    +		// do not load file into memory, this can have weird side-effects and break functionality
    +		HttpDataFactory factory = new DefaultHttpDataFactory(true);
    +
    +		HttpRequest httpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(),
messageHeaders.getTargetRestEndpointURL());
    +		httpRequest.headers()
    +			.set(HttpHeaders.Names.HOST, targetAddress + ':' + targetPort)
    +			.set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
    +
    +		// takes care of splitting the request into multiple parts
    +		HttpPostRequestEncoder bodyRequestEncoder;
    +		try {
    +			bodyRequestEncoder = new HttpPostRequestEncoder(factory, httpRequest, true);
    +
    +			Attribute requestAttribute = new MemoryAttribute(FileUploadHandler.HTTP_ATTRIBUTE_REQUEST);
    +			requestAttribute.setContent(payload);
    +			bodyRequestEncoder.addBodyHttpData(requestAttribute);
    +
    +			addPathsToEncoder(jars, FileUploadHandler.HTTP_ATTRIBUTE_JARS, RestConstants.JAR_CONTENT_TYPE,
bodyRequestEncoder);
    +			addPathsToEncoder(userArtifacts, FileUploadHandler.HTTP_ATTRIBUTE_ARTIFACTS, RestConstants.BINARY_CONTENT_TYPE,
bodyRequestEncoder);
    +
    +			bodyRequestEncoder.finalizeRequest();
    --- End diff --
    
    If it's not a multi-part request, then we should send the `HttpRequest` which is returned
here.


> Extend JobSubmitHandler to accept jar files
> -------------------------------------------
>
>                 Key: FLINK-9280
>                 URL: https://issues.apache.org/jira/browse/FLINK-9280
>             Project: Flink
>          Issue Type: New Feature
>          Components: Job-Submission, REST
>    Affects Versions: 1.5.0
>            Reporter: Chesnay Schepler
>            Assignee: Chesnay Schepler
>            Priority: Critical
>             Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob server,
sets the blob keys in the jobgraph, and then uploads this graph to The {{JobSubmitHandler}}
which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the blobserver
before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an optional list
of jar files, that were previously uploaded through the {{JarUploadHandler}}. If present,
the handler would upload these jars to the blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message