flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9499) Allow REST API for running a job to provide job configuration as body of POST request
Date Wed, 18 Jul 2018 09:04:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16547573#comment-16547573
] 

ASF GitHub Bot commented on FLINK-9499:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6330#discussion_r203302981
  
    --- Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
---
    @@ -0,0 +1,313 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.configuration.BlobServerOptions;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.dispatcher.DispatcherGateway;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
    +import org.apache.flink.runtime.messages.Acknowledge;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.HandlerRequestException;
    +import org.apache.flink.runtime.rest.messages.MessageParameter;
    +import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.runtime.util.BlobServerResource;
    +import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram;
    +import org.apache.flink.util.function.SupplierWithException;
    +import org.apache.flink.util.function.ThrowingConsumer;
    +
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.stream.Collectors;
    +
    +/**
    + * Tests for the parameter handling of the {@link JarRunHandler}.
    + */
    +public class JarRunHandlerParameterTest {
    +
    +	@ClassRule
    +	public static final TemporaryFolder TMP = new TemporaryFolder();
    +
    +	@ClassRule
    +	public static final BlobServerResource BLOB_SERVER_RESOURCE = new BlobServerResource();
    +
    +	private static final AtomicReference<JobGraph> lastSubmittedJobGraphReference
= new AtomicReference<>();
    +	private static JarRunHandler handler;
    +	private static Path jarWithManifest;
    +	private static Path jarWithoutManifest;
    +	private static TestingDispatcherGateway restfulGateway;
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		Path jarDir = TMP.newFolder().toPath();
    +
    +		// properties are set property by surefire plugin
    +		final String parameterProgramJarName = System.getProperty("parameterJarName") + ".jar";
    +		final String parameterProgramWithoutManifestJarName = System.getProperty("parameterJarWithoutManifestName")
+ ".jar";
    +		final Path jarLocation = Paths.get(System.getProperty("targetDir"));
    +
    +		jarWithManifest = Files.copy(
    +			jarLocation.resolve(parameterProgramJarName),
    +			jarDir.resolve("program-with-manifest.jar"));
    +		jarWithoutManifest = Files.copy(
    +			jarLocation.resolve(parameterProgramWithoutManifestJarName),
    +			jarDir.resolve("program-without-manifest.jar"));
    +
    +		Configuration config = new Configuration();
    +		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
    +			TMP.newFolder().getAbsolutePath());
    +
    +		restfulGateway = new TestingDispatcherGateway.Builder()
    +			.setBlobServerPort(BLOB_SERVER_RESOURCE.getBlobServerPort())
    +			.setSubmitFunction(jobGraph -> {
    +				lastSubmittedJobGraphReference.set(jobGraph);
    +				return CompletableFuture.completedFuture(Acknowledge.get());
    +			})
    +			.build();
    +		final GatewayRetriever<TestingDispatcherGateway> gatewayRetriever = () ->
CompletableFuture.completedFuture(restfulGateway);
    +		final CompletableFuture<String> localAddressFuture = CompletableFuture.completedFuture("shazam://localhost:12345");
    +		final Time timeout = Time.seconds(10);
    +		final Map<String, String> responseHeaders = Collections.emptyMap();
    +		final Executor executor = TestingUtils.defaultExecutor();
    +
    +		handler = new JarRunHandler(
    +			localAddressFuture,
    +			gatewayRetriever,
    +			timeout,
    +			responseHeaders,
    +			JarRunHeaders.getInstance(),
    +			jarDir,
    +			new Configuration(),
    +			executor);
    +	}
    +
    +	@Before
    +	public void reset() {
    +		ParameterProgram.actualArguments = null;
    +	}
    +
    +	@Test
    +	public void testDefaultParameters() throws Exception {
    +		// baseline, ensure that reasonable defaults are chosen
    +		sendRequestAndValidateGraph(
    +			handler,
    +			restfulGateway,
    +			() -> createRequest(
    +				new JarRunRequestBody(),
    +				JarRunHeaders.getInstance().getUnresolvedMessageParameters(),
    +				jarWithManifest
    +			),
    +			jobGraph -> {
    +				Assert.assertEquals(0, ParameterProgram.actualArguments.length);
    +
    +				Assert.assertEquals(ExecutionConfig.PARALLELISM_DEFAULT, getExecutionConfig(jobGraph).getParallelism());
    +
    +				final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
    +				Assert.assertFalse(savepointRestoreSettings.allowNonRestoredState());
    +				Assert.assertNull(savepointRestoreSettings.getRestorePath());
    +			}
    +		);
    +	}
    +
    +	@Test
    +	public void testConfigurationViaQueryParameters() throws Exception {
    +		// configure submission via query parameters
    +		sendRequestAndValidateGraph(
    +			handler,
    +			restfulGateway,
    +			() -> {
    +				final JarRunMessageParameters parameters = JarRunHeaders.getInstance().getUnresolvedMessageParameters();
    +				parameters.allowNonRestoredStateQueryParameter.resolve(Collections.singletonList(true));
    +				parameters.savepointPathQueryParameter.resolve(Collections.singletonList("/foo/bar"));
    +				parameters.entryClassQueryParameter.resolve(Collections.singletonList(ParameterProgram.class.getCanonicalName()));
    +				parameters.parallelismQueryParameter.resolve(Collections.singletonList(4));
    +				parameters.programArgsQueryParameter.resolve(Collections.singletonList("--host localhost
--port 1234"));
    +
    +				return createRequest(
    +					new JarRunRequestBody(),
    +					parameters,
    +					jarWithoutManifest
    +				);
    +			},
    +			jobGraph -> {
    +				Assert.assertEquals(4, ParameterProgram.actualArguments.length);
    +				Assert.assertEquals("--host", ParameterProgram.actualArguments[0]);
    +				Assert.assertEquals("localhost", ParameterProgram.actualArguments[1]);
    +				Assert.assertEquals("--port", ParameterProgram.actualArguments[2]);
    +				Assert.assertEquals("1234", ParameterProgram.actualArguments[3]);
    +
    +				Assert.assertEquals(4, getExecutionConfig(jobGraph).getParallelism());
    +
    +				final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
    +				Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
    +				Assert.assertEquals("/foo/bar", savepointRestoreSettings.getRestorePath());
    +			}
    +		);
    +	}
    +
    +	@Test
    +	public void testConfigurationViaJsonRequest() throws Exception {
    +		sendRequestAndValidateGraph(
    +			handler,
    +			restfulGateway,
    +			() -> {
    +				final JarRunRequestBody jsonRequest = new JarRunRequestBody(
    +					ParameterProgram.class.getCanonicalName(),
    +					"--host localhost --port 1234",
    +					4,
    +					true,
    +					"/foo/bar"
    +				);
    +
    +				return createRequest(
    +					jsonRequest,
    +					JarRunHeaders.getInstance().getUnresolvedMessageParameters(),
    +					jarWithoutManifest
    +				);
    +			},
    +			jobGraph -> {
    +				Assert.assertEquals(4, ParameterProgram.actualArguments.length);
    +				Assert.assertEquals("--host", ParameterProgram.actualArguments[0]);
    +				Assert.assertEquals("localhost", ParameterProgram.actualArguments[1]);
    +				Assert.assertEquals("--port", ParameterProgram.actualArguments[2]);
    +				Assert.assertEquals("1234", ParameterProgram.actualArguments[3]);
    +
    +				Assert.assertEquals(4, getExecutionConfig(jobGraph).getParallelism());
    +
    +				final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
    +				Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
    +				Assert.assertEquals("/foo/bar", savepointRestoreSettings.getRestorePath());
    +			}
    +		);
    +	}
    +
    +	@Test
    +	public void testParameterPrioritization() throws Exception {
    +		// configure submission via query parameters and JSON request, JSON should be prioritized
    +		sendRequestAndValidateGraph(
    +			handler,
    +			restfulGateway,
    +			() -> {
    +				final JarRunRequestBody jsonRequest = new JarRunRequestBody(
    +					ParameterProgram.class.getCanonicalName(),
    +					"--host localhost --port 1234",
    +					4,
    +					true,
    +					"/foo/bar"
    +				);
    +
    +				final JarRunMessageParameters parameters = JarRunHeaders.getInstance().getUnresolvedMessageParameters();
    +				parameters.allowNonRestoredStateQueryParameter.resolve(Collections.singletonList(false));
    +				parameters.savepointPathQueryParameter.resolve(Collections.singletonList("/no/uh"));
    +				parameters.entryClassQueryParameter.resolve(Collections.singletonList("please.dont.run.me"));
    +				parameters.parallelismQueryParameter.resolve(Collections.singletonList(64));
    +				parameters.programArgsQueryParameter.resolve(Collections.singletonList("--host wrong
--port wrong"));
    +
    +				return createRequest(
    +					jsonRequest,
    +					parameters,
    +					jarWithoutManifest
    +				);
    +			},
    +			jobGraph -> {
    +				Assert.assertEquals(4, ParameterProgram.actualArguments.length);
    +				Assert.assertEquals("--host", ParameterProgram.actualArguments[0]);
    +				Assert.assertEquals("localhost", ParameterProgram.actualArguments[1]);
    +				Assert.assertEquals("--port", ParameterProgram.actualArguments[2]);
    +				Assert.assertEquals("1234", ParameterProgram.actualArguments[3]);
    +
    +				Assert.assertEquals(4, getExecutionConfig(jobGraph).getParallelism());
    +
    +				final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
    +				Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
    +				Assert.assertEquals("/foo/bar", savepointRestoreSettings.getRestorePath());
    +			}
    +		);
    +	}
    +
    +	private static HandlerRequest<JarRunRequestBody, JarRunMessageParameters> createRequest(
    +			JarRunRequestBody requestBody,
    +			JarRunMessageParameters parameters,
    +			Path jar) throws HandlerRequestException {
    +
    +		final Map<String, List<String>> queryParameterAsMap = parameters.getQueryParameters().stream()
    +			.filter(MessageParameter::isResolved)
    +			.collect(Collectors.toMap(
    +				MessageParameter::getKey,
    +				JarRunHandlerParameterTest::getValuesAsString
    +			));
    +
    +		return new HandlerRequest<>(
    +			requestBody,
    +			JarRunHeaders.getInstance().getUnresolvedMessageParameters(),
    --- End diff --
    
    `HandlerRequest` doesn't work like that.
    It takes the parameter map from netty and inserts them into the `MessageParameters`.
    We could move logic into a static factory method and have the request be a simple container,
but changing that would be out-of-scope of this PR.


> Allow REST API for running a job to provide job configuration as body of POST request
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-9499
>                 URL: https://issues.apache.org/jira/browse/FLINK-9499
>             Project: Flink
>          Issue Type: Improvement
>          Components: REST
>    Affects Versions: 1.3.2
>            Reporter: Esteban Serrano
>            Assignee: Esteban Serrano
>            Priority: Minor
>              Labels: pull-request-available
>
> Based on [this|https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#submitting-programs] documentation, the
REST API provides a way to submit a request for running a Flink job. The POST request must
include the job configuration information as query parameters using the documented parameter
names ("program-args", "entry-class", "parallelism", etc.) 
> Depending on the job parameters, the full URL for the POST request can reach a size
that is over the maximum size (currently at 4096 bytes) of what is allowed by the configuration
of Netty. To overcome this, it would be useful to allow users to provide the job configuration
not only as query parameters but also as POST parameters. 
> For the most part, it is the "program-args" parameter that can make the URL grow in size
based on the needs of the developer and the job. All other attributes should be pretty constant.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message