flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9499) Allow REST API for running a job to provide job configuration as body of POST request
Date Wed, 18 Jul 2018 12:32:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16547752#comment-16547752
] 

ASF GitHub Bot commented on FLINK-9499:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6330#discussion_r203359445
  
    --- Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
---
    @@ -0,0 +1,313 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.configuration.BlobServerOptions;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.dispatcher.DispatcherGateway;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
    +import org.apache.flink.runtime.messages.Acknowledge;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.HandlerRequestException;
    +import org.apache.flink.runtime.rest.messages.MessageParameter;
    +import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.runtime.util.BlobServerResource;
    +import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram;
    +import org.apache.flink.util.function.SupplierWithException;
    +import org.apache.flink.util.function.ThrowingConsumer;
    +
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.stream.Collectors;
    +
    +/**
    + * Tests for the parameter handling of the {@link JarRunHandler}.
    + */
    +public class JarRunHandlerParameterTest {
    +
    +	@ClassRule
    +	public static final TemporaryFolder TMP = new TemporaryFolder();
    +
    +	@ClassRule
    +	public static final BlobServerResource BLOB_SERVER_RESOURCE = new BlobServerResource();
    +
    +	private static final AtomicReference<JobGraph> lastSubmittedJobGraphReference
= new AtomicReference<>();
    +	private static JarRunHandler handler;
    +	private static Path jarWithManifest;
    +	private static Path jarWithoutManifest;
    +	private static TestingDispatcherGateway restfulGateway;
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		Path jarDir = TMP.newFolder().toPath();
    +
    +		// properties are set property by surefire plugin
    +		final String parameterProgramJarName = System.getProperty("parameterJarName") + ".jar";
    +		final String parameterProgramWithoutManifestJarName = System.getProperty("parameterJarWithoutManifestName")
+ ".jar";
    +		final Path jarLocation = Paths.get(System.getProperty("targetDir"));
    +
    +		jarWithManifest = Files.copy(
    +			jarLocation.resolve(parameterProgramJarName),
    +			jarDir.resolve("program-with-manifest.jar"));
    +		jarWithoutManifest = Files.copy(
    +			jarLocation.resolve(parameterProgramWithoutManifestJarName),
    +			jarDir.resolve("program-without-manifest.jar"));
    +
    +		Configuration config = new Configuration();
    +		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
    +			TMP.newFolder().getAbsolutePath());
    +
    +		restfulGateway = new TestingDispatcherGateway.Builder()
    +			.setBlobServerPort(BLOB_SERVER_RESOURCE.getBlobServerPort())
    +			.setSubmitFunction(jobGraph -> {
    +				lastSubmittedJobGraphReference.set(jobGraph);
    +				return CompletableFuture.completedFuture(Acknowledge.get());
    +			})
    +			.build();
    +		final GatewayRetriever<TestingDispatcherGateway> gatewayRetriever = () ->
CompletableFuture.completedFuture(restfulGateway);
    +		final CompletableFuture<String> localAddressFuture = CompletableFuture.completedFuture("shazam://localhost:12345");
    +		final Time timeout = Time.seconds(10);
    +		final Map<String, String> responseHeaders = Collections.emptyMap();
    +		final Executor executor = TestingUtils.defaultExecutor();
    +
    +		handler = new JarRunHandler(
    +			localAddressFuture,
    +			gatewayRetriever,
    +			timeout,
    +			responseHeaders,
    +			JarRunHeaders.getInstance(),
    +			jarDir,
    +			new Configuration(),
    +			executor);
    +	}
    +
    +	@Before
    +	public void reset() {
    +		ParameterProgram.actualArguments = null;
    +	}
    +
    +	@Test
    +	public void testDefaultParameters() throws Exception {
    +		// baseline, ensure that reasonable defaults are chosen
    +		sendRequestAndValidateGraph(
    +			handler,
    +			restfulGateway,
    +			() -> createRequest(
    +				new JarRunRequestBody(),
    +				JarRunHeaders.getInstance().getUnresolvedMessageParameters(),
    +				jarWithManifest
    +			),
    +			jobGraph -> {
    +				Assert.assertEquals(0, ParameterProgram.actualArguments.length);
    +
    +				Assert.assertEquals(ExecutionConfig.PARALLELISM_DEFAULT, getExecutionConfig(jobGraph).getParallelism());
    +
    +				final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
    +				Assert.assertFalse(savepointRestoreSettings.allowNonRestoredState());
    +				Assert.assertNull(savepointRestoreSettings.getRestorePath());
    +			}
    +		);
    +	}
    +
    +	@Test
    +	public void testConfigurationViaQueryParameters() throws Exception {
    +		// configure submission via query parameters
    +		sendRequestAndValidateGraph(
    +			handler,
    +			restfulGateway,
    +			() -> {
    +				final JarRunMessageParameters parameters = JarRunHeaders.getInstance().getUnresolvedMessageParameters();
    +				parameters.allowNonRestoredStateQueryParameter.resolve(Collections.singletonList(true));
    +				parameters.savepointPathQueryParameter.resolve(Collections.singletonList("/foo/bar"));
    +				parameters.entryClassQueryParameter.resolve(Collections.singletonList(ParameterProgram.class.getCanonicalName()));
    +				parameters.parallelismQueryParameter.resolve(Collections.singletonList(4));
    +				parameters.programArgsQueryParameter.resolve(Collections.singletonList("--host localhost
--port 1234"));
    +
    +				return createRequest(
    +					new JarRunRequestBody(),
    +					parameters,
    +					jarWithoutManifest
    +				);
    +			},
    +			jobGraph -> {
    +				Assert.assertEquals(4, ParameterProgram.actualArguments.length);
    +				Assert.assertEquals("--host", ParameterProgram.actualArguments[0]);
    +				Assert.assertEquals("localhost", ParameterProgram.actualArguments[1]);
    +				Assert.assertEquals("--port", ParameterProgram.actualArguments[2]);
    +				Assert.assertEquals("1234", ParameterProgram.actualArguments[3]);
    +
    +				Assert.assertEquals(4, getExecutionConfig(jobGraph).getParallelism());
    +
    +				final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
    +				Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
    +				Assert.assertEquals("/foo/bar", savepointRestoreSettings.getRestorePath());
    +			}
    +		);
    +	}
    +
    +	@Test
    +	public void testConfigurationViaJsonRequest() throws Exception {
    +		sendRequestAndValidateGraph(
    +			handler,
    +			restfulGateway,
    +			() -> {
    +				final JarRunRequestBody jsonRequest = new JarRunRequestBody(
    +					ParameterProgram.class.getCanonicalName(),
    +					"--host localhost --port 1234",
    +					4,
    +					true,
    +					"/foo/bar"
    +				);
    +
    +				return createRequest(
    +					jsonRequest,
    +					JarRunHeaders.getInstance().getUnresolvedMessageParameters(),
    +					jarWithoutManifest
    +				);
    +			},
    +			jobGraph -> {
    +				Assert.assertEquals(4, ParameterProgram.actualArguments.length);
    +				Assert.assertEquals("--host", ParameterProgram.actualArguments[0]);
    +				Assert.assertEquals("localhost", ParameterProgram.actualArguments[1]);
    +				Assert.assertEquals("--port", ParameterProgram.actualArguments[2]);
    +				Assert.assertEquals("1234", ParameterProgram.actualArguments[3]);
    +
    +				Assert.assertEquals(4, getExecutionConfig(jobGraph).getParallelism());
    +
    +				final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
    +				Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
    +				Assert.assertEquals("/foo/bar", savepointRestoreSettings.getRestorePath());
    +			}
    +		);
    +	}
    +
    +	@Test
    +	public void testParameterPrioritization() throws Exception {
    +		// configure submission via query parameters and JSON request, JSON should be prioritized
    +		sendRequestAndValidateGraph(
    +			handler,
    +			restfulGateway,
    +			() -> {
    +				final JarRunRequestBody jsonRequest = new JarRunRequestBody(
    +					ParameterProgram.class.getCanonicalName(),
    +					"--host localhost --port 1234",
    +					4,
    +					true,
    +					"/foo/bar"
    +				);
    +
    +				final JarRunMessageParameters parameters = JarRunHeaders.getInstance().getUnresolvedMessageParameters();
    +				parameters.allowNonRestoredStateQueryParameter.resolve(Collections.singletonList(false));
    +				parameters.savepointPathQueryParameter.resolve(Collections.singletonList("/no/uh"));
    +				parameters.entryClassQueryParameter.resolve(Collections.singletonList("please.dont.run.me"));
    +				parameters.parallelismQueryParameter.resolve(Collections.singletonList(64));
    +				parameters.programArgsQueryParameter.resolve(Collections.singletonList("--host wrong
--port wrong"));
    +
    +				return createRequest(
    +					jsonRequest,
    +					parameters,
    +					jarWithoutManifest
    +				);
    +			},
    +			jobGraph -> {
    +				Assert.assertEquals(4, ParameterProgram.actualArguments.length);
    +				Assert.assertEquals("--host", ParameterProgram.actualArguments[0]);
    +				Assert.assertEquals("localhost", ParameterProgram.actualArguments[1]);
    +				Assert.assertEquals("--port", ParameterProgram.actualArguments[2]);
    +				Assert.assertEquals("1234", ParameterProgram.actualArguments[3]);
    +
    +				Assert.assertEquals(4, getExecutionConfig(jobGraph).getParallelism());
    +
    +				final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
    +				Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
    +				Assert.assertEquals("/foo/bar", savepointRestoreSettings.getRestorePath());
    +			}
    +		);
    +	}
    +
    +	private static HandlerRequest<JarRunRequestBody, JarRunMessageParameters> createRequest(
    +			JarRunRequestBody requestBody,
    +			JarRunMessageParameters parameters,
    +			Path jar) throws HandlerRequestException {
    +
    +		final Map<String, List<String>> queryParameterAsMap = parameters.getQueryParameters().stream()
    +			.filter(MessageParameter::isResolved)
    +			.collect(Collectors.toMap(
    +				MessageParameter::getKey,
    +				JarRunHandlerParameterTest::getValuesAsString
    +			));
    +
    +		return new HandlerRequest<>(
    +			requestBody,
    +			JarRunHeaders.getInstance().getUnresolvedMessageParameters(),
    --- End diff --
    
    Ah I see. You're right. I think this could be a nice improvement to initialize the `MessageParameter`
instance before giving it to the `HandlerRequest`. But this is out of scope for this PR.


> Allow REST API for running a job to provide job configuration as body of POST request
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-9499
>                 URL: https://issues.apache.org/jira/browse/FLINK-9499
>             Project: Flink
>          Issue Type: Improvement
>          Components: REST
>    Affects Versions: 1.3.2
>            Reporter: Esteban Serrano
>            Assignee: Esteban Serrano
>            Priority: Minor
>              Labels: pull-request-available
>
> Based on [this|https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#submitting-programs] documentation, the
REST API provides a way to submit a request for running a Flink job. The POST request must
include the job configuration information as query parameters using the documented parameter
names ("program-args", "entry-class", "parallelism", etc.) 
> Depending on the job parameters, the full URL for the POST request can reach a size
that is over the maximum size (currently at 4096 bytes) of what is allowed by the configuration
of Netty. To overcome this, it would be useful to allow users to provide the job configuration
not only as query parameters but also as POST parameters. 
> For the most part, it is the "program-args" parameter that can make the URL grow in size
based on the needs of the developer and the job. All other attributes should be pretty constant.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message