flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [3/3] flink git commit: [FLINK-7521][flip6] Return HTTP 413 if request limit is exceeded.
Date Wed, 14 Mar 2018 08:40:45 GMT
[FLINK-7521][flip6] Return HTTP 413 if request limit is exceeded.

Remove unnecessary PipelineErrorHandler from RestClient.
Rename config keys for configuring request and response limits.
Set response headers for all error responses.

This closes #5685.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da3fc4fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da3fc4fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da3fc4fd

Branch: refs/heads/master
Commit: da3fc4fde2796af262dd275f3ea87a5b7bc69c5a
Parents: 9905700
Author: gyao <gary@data-artisans.com>
Authored: Mon Mar 12 23:16:25 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Wed Mar 14 09:37:51 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/configuration/RestOptions.java |  18 +--
 .../dispatcher/DispatcherRestEndpoint.java      |   2 -
 .../runtime/rest/FlinkHttpObjectAggregator.java |  67 +++++++++++
 .../apache/flink/runtime/rest/RestClient.java   |  16 ++-
 .../runtime/rest/RestClientConfiguration.java   |   8 +-
 .../flink/runtime/rest/RestServerEndpoint.java  |  10 +-
 .../rest/RestServerEndpointConfiguration.java   |  34 +++++-
 .../rest/handler/PipelineErrorHandler.java      |  16 ++-
 .../rest/handler/RestHandlerConfiguration.java  |  22 +---
 .../runtime/rest/handler/RouterHandler.java     |  13 ++-
 .../runtime/rest/handler/util/HandlerUtils.java |  56 ++++++++-
 .../runtime/webmonitor/WebMonitorEndpoint.java  |   2 -
 .../runtime/rest/RestServerEndpointITCase.java  | 113 ++++++++++++++++---
 ...SubtaskCurrentAttemptDetailsHandlerTest.java |   2 +-
 ...ExecutionAttemptAccumulatorsHandlerTest.java |   3 +-
 ...btaskExecutionAttemptDetailsHandlerTest.java |   2 +-
 16 files changed, 304 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/da3fc4fd/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index 61bb085..94d7977 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -83,17 +83,19 @@ public class RestOptions {
 			.withDescription("The maximum time in ms for the client to establish a TCP connection.");
 
 	/**
-	 * The max content length that the server will handle.
+	 * The maximum content length that the server will handle.
 	 */
-	public static final ConfigOption<Integer> REST_SERVER_CONTENT_MAX_MB =
-		key("rest.server.content.max.mb")
-			.defaultValue(10);
+	public static final ConfigOption<Integer> REST_SERVER_MAX_CONTENT_LENGTH =
+		key("rest.server.max-content-length")
+			.defaultValue(104_857_600)
+			.withDescription("The maximum content length in bytes that the server will handle.");
 
 	/**
-	 * The max content length that the client will handle.
+	 * The maximum content length that the client will handle.
 	 */
-	public static final ConfigOption<Integer> REST_CLIENT_CONTENT_MAX_MB =
-		key("rest.client.content.max.mb")
-			.defaultValue(1);
+	public static final ConfigOption<Integer> REST_CLIENT_MAX_CONTENT_LENGTH =
+		key("rest.client.max-content-length")
+			.defaultValue(104_857_600)
+			.withDescription("The maximum content length in bytes that the client will handle.");
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/da3fc4fd/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 9df6dee..4518552 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -44,7 +44,6 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
@@ -92,7 +91,6 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway
 		// Add the Dispatcher specific handlers
 
 		final Time timeout = restConfiguration.getTimeout();
-		final Map<String, String> responseHeaders = restConfiguration.getResponseHeaders();
 
 		BlobServerPortHandler blobServerPortHandler = new BlobServerPortHandler(
 			restAddressFuture,

http://git-wip-us.apache.org/repos/asf/flink/blob/da3fc4fd/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FlinkHttpObjectAggregator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FlinkHttpObjectAggregator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FlinkHttpObjectAggregator.java
new file mode 100644
index 0000000..4ee0256
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FlinkHttpObjectAggregator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Same as {@link org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectDecoder}
+ * but returns HTTP 413 to the client if the payload exceeds {@link #maxContentLength}.
+ */
+public class FlinkHttpObjectAggregator extends org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator
{
+
+	private final Map<String, String> responseHeaders;
+
+	public FlinkHttpObjectAggregator(final int maxContentLength, @Nonnull final Map<String,
String> responseHeaders) {
+		super(maxContentLength);
+		this.responseHeaders = responseHeaders;
+	}
+
+	@Override
+	protected void decode(
+			final ChannelHandlerContext ctx,
+			final HttpObject msg,
+			final List<Object> out) throws Exception {
+
+		try {
+			super.decode(ctx, msg, out);
+		} catch (final TooLongFrameException e) {
+			HandlerUtils.sendErrorResponse(
+				ctx,
+				false,
+				new ErrorResponseBody(String.format(
+					e.getMessage() + " Try to raise [%s]",
+					RestOptions.REST_SERVER_MAX_CONTENT_LENGTH.key())),
+				HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE,
+				responseHeaders);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/da3fc4fd/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 801119d..6319634 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.rest;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
@@ -50,6 +50,7 @@ import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandl
 import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
 import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
@@ -104,8 +105,7 @@ public class RestClient {
 				socketChannel.pipeline()
 					.addLast(new HttpClientCodec())
 					.addLast(new HttpObjectAggregator(configuration.getMaxContentLength()))
-					.addLast(new ClientHandler())
-					.addLast(new PipelineErrorHandler(LOG));
+					.addLast(new ClientHandler());
 			}
 		};
 		NioEventLoopGroup group = new NioEventLoopGroup(1, new DefaultThreadFactory("flink-rest-client-netty"));
@@ -269,8 +269,14 @@ public class RestClient {
 		}
 
 		@Override
-		public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws
Exception {
-			jsonFuture.completeExceptionally(cause);
+		public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+			if (cause instanceof TooLongFrameException) {
+				jsonFuture.completeExceptionally(new TooLongFrameException(String.format(
+					cause.getMessage() + " Try to raise [%s]",
+					RestOptions.REST_CLIENT_MAX_CONTENT_LENGTH.key())));
+			} else {
+				jsonFuture.completeExceptionally(cause);
+			}
 			ctx.close();
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/da3fc4fd/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
index 782cb4e..17d4264 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
@@ -29,6 +29,8 @@ import javax.annotation.Nullable;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * A configuration object for {@link RestClient}s.
  */
@@ -45,6 +47,7 @@ public final class RestClientConfiguration {
 			@Nullable final SSLEngine sslEngine,
 			final long connectionTimeout,
 			final int maxContentLength) {
+		checkArgument(maxContentLength > 0, "maxContentLength must be positive, was: %d", maxContentLength);
 		this.sslEngine = sslEngine;
 		this.connectionTimeout = connectionTimeout;
 		this.maxContentLength = maxContentLength;
@@ -104,10 +107,7 @@ public final class RestClientConfiguration {
 
 		final long connectionTimeout = config.getLong(RestOptions.CONNECTION_TIMEOUT);
 
-		int maxContentLength = config.getInteger(RestOptions.REST_CLIENT_CONTENT_MAX_MB) * 1024
* 1024;
-		if (maxContentLength <= 0) {
-			throw new ConfigurationException("Max content length for client must be a positive integer:
" + maxContentLength);
-		}
+		int maxContentLength = config.getInteger(RestOptions.REST_CLIENT_MAX_CONTENT_LENGTH);
 
 		return new RestClientConfiguration(sslEngine, connectionTimeout, maxContentLength);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/da3fc4fd/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index 8b39250..a3d4843 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -37,7 +37,6 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
 import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
 import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Handler;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
@@ -59,6 +58,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
@@ -78,6 +78,7 @@ public abstract class RestServerEndpoint {
 	private final SSLEngine sslEngine;
 	private final Path uploadDir;
 	private final int maxContentLength;
+	protected final Map<String, String> responseHeaders;
 
 	private final CompletableFuture<Void> terminationFuture;
 
@@ -97,6 +98,7 @@ public abstract class RestServerEndpoint {
 		createUploadDir(uploadDir, log);
 
 		this.maxContentLength = configuration.getMaxContentLength();
+		this.responseHeaders = configuration.getResponseHeaders();
 
 		terminationFuture = new CompletableFuture<>();
 
@@ -148,7 +150,7 @@ public abstract class RestServerEndpoint {
 
 				@Override
 				protected void initChannel(SocketChannel ch) {
-					Handler handler = new RouterHandler(router);
+					Handler handler = new RouterHandler(router, responseHeaders);
 
 					// SSL should be the first handler in the pipeline
 					if (sslEngine != null) {
@@ -158,9 +160,9 @@ public abstract class RestServerEndpoint {
 					ch.pipeline()
 						.addLast(new HttpServerCodec())
 						.addLast(new FileUploadHandler(uploadDir))
-						.addLast(new HttpObjectAggregator(maxContentLength))
+						.addLast(new FlinkHttpObjectAggregator(maxContentLength, responseHeaders))
 						.addLast(handler.name(), handler)
-						.addLast(new PipelineErrorHandler(log));
+						.addLast(new PipelineErrorHandler(log, responseHeaders));
 				}
 			};
 

http://git-wip-us.apache.org/repos/asf/flink/blob/da3fc4fd/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
index 3685e2d..35bd6ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
@@ -26,12 +26,16 @@ import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+
 import javax.annotation.Nullable;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.Map;
 import java.util.UUID;
 
 import static java.util.Objects.requireNonNull;
@@ -53,20 +57,24 @@ public final class RestServerEndpointConfiguration {
 
 	private final int maxContentLength;
 
+	private final Map<String, String> responseHeaders;
+
 	private RestServerEndpointConfiguration(
 			@Nullable String restBindAddress,
 			int restBindPort,
 			@Nullable SSLEngine sslEngine,
 			final Path uploadDir,
-			final int maxContentLength) {
+			final int maxContentLength, final Map<String, String> responseHeaders) {
 
 		Preconditions.checkArgument(0 <= restBindPort && restBindPort < 65536, "The
bing rest port " + restBindPort + " is out of range (0, 65536[");
+		Preconditions.checkArgument(maxContentLength > 0, "maxContentLength must be positive,
was: %d", maxContentLength);
 
 		this.restBindAddress = restBindAddress;
 		this.restBindPort = restBindPort;
 		this.sslEngine = sslEngine;
 		this.uploadDir = requireNonNull(uploadDir);
 		this.maxContentLength = maxContentLength;
+		this.responseHeaders = requireNonNull(Collections.unmodifiableMap(responseHeaders));
 	}
 
 	/**
@@ -113,6 +121,13 @@ public final class RestServerEndpointConfiguration {
 	}
 
 	/**
+	 * Response headers that should be added to every HTTP response.
+	 */
+	public Map<String, String> getResponseHeaders() {
+		return responseHeaders;
+	}
+
+	/**
 	 * Creates and returns a new {@link RestServerEndpointConfiguration} from the given {@link
Configuration}.
 	 *
 	 * @param config configuration from which the REST server endpoint configuration should
be created from
@@ -144,11 +159,18 @@ public final class RestServerEndpointConfiguration {
 			config.getString(WebOptions.UPLOAD_DIR,	config.getString(WebOptions.TMP_DIR)),
 			"flink-web-upload-" + UUID.randomUUID());
 
-		int maxContentLength = config.getInteger(RestOptions.REST_SERVER_CONTENT_MAX_MB) * 1024
* 1024;
-		if (maxContentLength <= 0) {
-			throw new ConfigurationException("Max content length for server must be a positive integer:
" + maxContentLength);
-		}
+		int maxContentLength = config.getInteger(RestOptions.REST_SERVER_MAX_CONTENT_LENGTH);
+
+		final Map<String, String> responseHeaders = Collections.singletonMap(
+			HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN,
+			config.getString(WebOptions.ACCESS_CONTROL_ALLOW_ORIGIN));
 
-		return new RestServerEndpointConfiguration(address, port, sslEngine, uploadDir, maxContentLength);
+		return new RestServerEndpointConfiguration(
+			address,
+			port,
+			sslEngine,
+			uploadDir,
+			maxContentLength,
+			responseHeaders);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/da3fc4fd/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
index 046118a..a16b01f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
@@ -30,6 +30,9 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 import org.slf4j.Logger;
 
 import java.util.Collections;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
 
 /**
  * This is the last handler in the pipeline. It logs all error messages.
@@ -40,8 +43,11 @@ public class PipelineErrorHandler extends SimpleChannelInboundHandler<HttpReques
 	/** The logger to which the handler writes the log statements. */
 	private final Logger logger;
 
-	public PipelineErrorHandler(Logger logger) {
-		this.logger = logger;
+	private final Map<String, String> responseHeaders;
+
+	public PipelineErrorHandler(Logger logger, final Map<String, String> responseHeaders)
{
+		this.logger = requireNonNull(logger);
+		this.responseHeaders = requireNonNull(responseHeaders);
 	}
 
 	@Override
@@ -59,5 +65,11 @@ public class PipelineErrorHandler extends SimpleChannelInboundHandler<HttpReques
 	@Override
 	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
 		logger.warn("Unhandled exception", cause);
+		HandlerUtils.sendErrorResponse(
+			ctx,
+			false,
+			new ErrorResponseBody("Internal server error: " + cause.getMessage()),
+			HttpResponseStatus.INTERNAL_SERVER_ERROR,
+			responseHeaders);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/da3fc4fd/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
index acdd63c..f92946b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
@@ -23,11 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.util.Preconditions;
 
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
-
 import java.io.File;
-import java.util.Collections;
-import java.util.Map;
 import java.util.UUID;
 
 /**
@@ -43,14 +39,11 @@ public class RestHandlerConfiguration {
 
 	private final File tmpDir;
 
-	private final Map<String, String> responseHeaders;
-
 	public RestHandlerConfiguration(
 			long refreshInterval,
 			int maxCheckpointStatisticCacheEntries,
 			Time timeout,
-			File tmpDir,
-			Map<String, String> responseHeaders) {
+			File tmpDir) {
 		Preconditions.checkArgument(refreshInterval > 0L, "The refresh interval (ms) should
be larger than 0.");
 		this.refreshInterval = refreshInterval;
 
@@ -58,8 +51,6 @@ public class RestHandlerConfiguration {
 
 		this.timeout = Preconditions.checkNotNull(timeout);
 		this.tmpDir = Preconditions.checkNotNull(tmpDir);
-
-		this.responseHeaders = Preconditions.checkNotNull(responseHeaders);
 	}
 
 	public long getRefreshInterval() {
@@ -78,10 +69,6 @@ public class RestHandlerConfiguration {
 		return tmpDir;
 	}
 
-	public Map<String, String> getResponseHeaders() {
-		return Collections.unmodifiableMap(responseHeaders);
-	}
-
 	public static RestHandlerConfiguration fromConfiguration(Configuration configuration) {
 		final long refreshInterval = configuration.getLong(WebOptions.REFRESH_INTERVAL);
 
@@ -92,15 +79,10 @@ public class RestHandlerConfiguration {
 		final String rootDir = "flink-web-" + UUID.randomUUID();
 		final File tmpDir = new File(configuration.getString(WebOptions.TMP_DIR), rootDir);
 
-		final Map<String, String> responseHeaders = Collections.singletonMap(
-			HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN,
-			configuration.getString(WebOptions.ACCESS_CONTROL_ALLOW_ORIGIN));
-
 		return new RestHandlerConfiguration(
 			refreshInterval,
 			maxCheckpointStatisticCacheEntries,
 			timeout,
-			tmpDir,
-			responseHeaders);
+			tmpDir);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/da3fc4fd/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
index d1d0837..fc02250 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
@@ -27,20 +27,21 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Handler;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.Map;
 
-import java.util.Collections;
+import static java.util.Objects.requireNonNull;
 
 /**
  * This class is an extension of {@link Handler} that replaces the standard error response
to be identical with those
  * sent by the {@link AbstractRestHandler}.
  */
 public class RouterHandler extends Handler {
-	private static final Logger LOG = LoggerFactory.getLogger(RouterHandler.class);
 
-	public RouterHandler(Router router) {
+	private final Map<String, String> responseHeaders;
+
+	public RouterHandler(Router router, final Map<String, String> responseHeaders) {
 		super(router);
+		this.responseHeaders = requireNonNull(responseHeaders);
 	}
 
 	@Override
@@ -50,6 +51,6 @@ public class RouterHandler extends Handler {
 			request,
 			new ErrorResponseBody("Not found."),
 			HttpResponseStatus.NOT_FOUND,
-			Collections.emptyMap());
+			responseHeaders);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/da3fc4fd/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
index a69f4aa..b407ada 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
@@ -112,6 +112,30 @@ public class HandlerUtils {
 			HttpResponseStatus statusCode,
 			Map<String, String> headers) {
 
+		sendErrorResponse(
+			channelHandlerContext,
+			HttpHeaders.isKeepAlive(httpRequest),
+			errorMessage,
+			statusCode,
+			headers);
+	}
+
+	/**
+	 * Sends the given error response and status code to the given channel.
+	 *
+	 * @param channelHandlerContext identifying the open channel
+	 * @param keepAlive If the connection should be kept alive.
+	 * @param errorMessage which should be sent
+	 * @param statusCode of the message to send
+	 * @param headers additional header values
+	 */
+	public static void sendErrorResponse(
+			ChannelHandlerContext channelHandlerContext,
+			boolean keepAlive,
+			ErrorResponseBody errorMessage,
+			HttpResponseStatus statusCode,
+			Map<String, String> headers) {
+
 		StringWriter sw = new StringWriter();
 		try {
 			mapper.writeValue(sw, errorMessage);
@@ -120,14 +144,14 @@ public class HandlerUtils {
 			LOG.error("Internal server error. Could not map error response to JSON.", e);
 			sendResponse(
 				channelHandlerContext,
-				httpRequest,
+				keepAlive,
 				"Internal server error. Could not map error response to JSON.",
 				HttpResponseStatus.INTERNAL_SERVER_ERROR,
 				headers);
 		}
 		sendResponse(
 			channelHandlerContext,
-			httpRequest,
+			keepAlive,
 			sw.toString(),
 			statusCode,
 			headers);
@@ -148,6 +172,30 @@ public class HandlerUtils {
 			@Nonnull String message,
 			@Nonnull HttpResponseStatus statusCode,
 			@Nonnull Map<String, String> headers) {
+
+		sendResponse(
+			channelHandlerContext,
+			HttpHeaders.isKeepAlive(httpRequest),
+			message,
+			statusCode,
+			headers);
+	}
+
+	/**
+	 * Sends the given response and status code to the given channel.
+	 *
+	 * @param channelHandlerContext identifying the open channel
+	 * @param keepAlive If the connection should be kept alive.
+	 * @param message which should be sent
+	 * @param statusCode of the message to send
+	 * @param headers additional header values
+	 */
+	public static void sendResponse(
+			@Nonnull ChannelHandlerContext channelHandlerContext,
+			boolean keepAlive,
+			@Nonnull String message,
+			@Nonnull HttpResponseStatus statusCode,
+			@Nonnull Map<String, String> headers) {
 		HttpResponse response = new DefaultHttpResponse(HTTP_1_1, statusCode);
 
 		response.headers().set(CONTENT_TYPE, RestConstants.REST_CONTENT_TYPE);
@@ -156,7 +204,7 @@ public class HandlerUtils {
 			response.headers().set(headerEntry.getKey(), headerEntry.getValue());
 		}
 
-		if (HttpHeaders.isKeepAlive(httpRequest)) {
+		if (keepAlive) {
 			response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
 		}
 
@@ -172,7 +220,7 @@ public class HandlerUtils {
 		ChannelFuture lastContentFuture = channelHandlerContext.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
 
 		// close the connection, if no keep-alive is needed
-		if (!HttpHeaders.isKeepAlive(httpRequest)) {
+		if (!keepAlive) {
 			lastContentFuture.addListener(ChannelFutureListener.CLOSE);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/da3fc4fd/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 10a3650..dfb2fc8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -129,7 +129,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -199,7 +198,6 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends
RestServerEndp
 		ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers
= new ArrayList<>(30);
 
 		final Time timeout = restConfiguration.getTimeout();
-		final Map<String, String> responseHeaders = restConfiguration.getResponseHeaders();
 
 		ClusterOverviewHandler clusterOverviewHandler = new ClusterOverviewHandler(
 			restAddressFuture,

http://git-wip-us.apache.org/repos/asf/flink/blob/da3fc4fd/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index c9817ff..32f3ec8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -51,10 +51,10 @@ import org.apache.flink.util.TestLogger;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -81,7 +81,12 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -96,6 +101,7 @@ public class RestServerEndpointITCase extends TestLogger {
 	private static final JobID QUERY_JOB_ID = new JobID();
 	private static final String JOB_ID_KEY = "jobid";
 	private static final Time timeout = Time.seconds(10L);
+	private static final int TEST_REST_MAX_CONTENT_LENGTH = 4096;
 
 	@Rule
 	public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -103,12 +109,15 @@ public class RestServerEndpointITCase extends TestLogger {
 	private RestServerEndpoint serverEndpoint;
 	private RestClient restClient;
 	private TestUploadHandler testUploadHandler;
+	private InetSocketAddress serverAddress;
 
 	@Before
 	public void setup() throws Exception {
 		Configuration config = new Configuration();
 		config.setInteger(RestOptions.REST_PORT, 0);
 		config.setString(WebOptions.UPLOAD_DIR, temporaryFolder.newFolder().getCanonicalPath());
+		config.setInteger(RestOptions.REST_SERVER_MAX_CONTENT_LENGTH, TEST_REST_MAX_CONTENT_LENGTH);
+		config.setInteger(RestOptions.REST_CLIENT_MAX_CONTENT_LENGTH, TEST_REST_MAX_CONTENT_LENGTH);
 
 		RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);
 		RestClientConfiguration clientConfig = RestClientConfiguration.fromConfiguration(config);
@@ -133,6 +142,7 @@ public class RestServerEndpointITCase extends TestLogger {
 		restClient = new TestRestClient(clientConfig);
 
 		serverEndpoint.start();
+		serverAddress = serverEndpoint.getServerAddress();
 	}
 
 	@After
@@ -161,7 +171,6 @@ public class RestServerEndpointITCase extends TestLogger {
 
 		// send first request and wait until the handler blocks
 		CompletableFuture<TestResponse> response1;
-		final InetSocketAddress serverAddress = serverEndpoint.getServerAddress();
 
 		synchronized (TestHandler.LOCK) {
 			response1 = restClient.sendRequest(
@@ -198,8 +207,6 @@ public class RestServerEndpointITCase extends TestLogger {
 	 */
 	@Test
 	public void testBadHandlerRequest() throws Exception {
-		final InetSocketAddress serverAddress = serverEndpoint.getServerAddress();
-
 		final FaultyTestParameters parameters = new FaultyTestParameters();
 
 		parameters.faultyJobIDPathParameter.resolve(PATH_JOB_ID);
@@ -215,11 +222,11 @@ public class RestServerEndpointITCase extends TestLogger {
 		try {
 			response.get();
 
-			Assert.fail("The request should fail with a bad request return code.");
+			fail("The request should fail with a bad request return code.");
 		} catch (ExecutionException ee) {
 			Throwable t = ExceptionUtils.stripExecutionException(ee);
 
-			Assert.assertTrue(t instanceof RestClientException);
+			assertTrue(t instanceof RestClientException);
 
 			RestClientException rce = (RestClientException) t;
 
@@ -228,6 +235,50 @@ public class RestServerEndpointITCase extends TestLogger {
 	}
 
 	/**
+	 * Tests that requests and responses larger than {@link #TEST_REST_MAX_CONTENT_LENGTH}
+	 * are rejected by the server and client, respectively.
+	 */
+	@Test
+	public void testMaxContentLengthLimit() throws Exception {
+		final TestParameters parameters = new TestParameters();
+		parameters.jobIDPathParameter.resolve(PATH_JOB_ID);
+		parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
+
+		CompletableFuture<TestResponse> response;
+		response = restClient.sendRequest(
+			serverAddress.getHostName(),
+			serverAddress.getPort(),
+			new TestHeaders(),
+			parameters,
+			new TestRequest(2, createStringOfSize(TEST_REST_MAX_CONTENT_LENGTH)));
+
+		try {
+			response.get();
+			fail("Expected exception not thrown");
+		} catch (final ExecutionException e) {
+			final Throwable throwable = ExceptionUtils.stripExecutionException(e);
+			assertThat(throwable, instanceOf(RestClientException.class));
+			assertThat(throwable.getMessage(), containsString("Try to raise"));
+		}
+
+		response = restClient.sendRequest(
+			serverAddress.getHostName(),
+			serverAddress.getPort(),
+			new TestHeaders(),
+			parameters,
+			new TestRequest(TestHandler.LARGE_RESPONSE_BODY_ID));
+
+		try {
+			response.get();
+			fail("Expected exception not thrown");
+		} catch (final ExecutionException e) {
+			final Throwable throwable = ExceptionUtils.stripExecutionException(e);
+			assertThat(throwable, instanceOf(TooLongFrameException.class));
+			assertThat(throwable.getMessage(), containsString("Try to raise"));
+		}
+	}
+
+	/**
 	 * Tests that multipart/form-data uploads work correctly.
 	 *
 	 * @see FileUploadHandler
@@ -294,6 +345,14 @@ public class RestServerEndpointITCase extends TestLogger {
 		return Long.toHexString(System.currentTimeMillis());
 	}
 
+	private static String createStringOfSize(int size) {
+		StringBuilder sb = new StringBuilder(size);
+		for (int i = 0; i < size; i++) {
+			sb.append('a');
+		}
+		return sb.toString();
+	}
+
 	private static class TestRestServerEndpoint extends RestServerEndpoint {
 
 		private final TestHandler testHandler;
@@ -323,12 +382,14 @@ public class RestServerEndpointITCase extends TestLogger {
 
 	private static class TestHandler extends AbstractRestHandler<RestfulGateway, TestRequest,
TestResponse, TestParameters> {
 
-		public static final Object LOCK = new Object();
+		private static final Object LOCK = new Object();
+
+		private static final int LARGE_RESPONSE_BODY_ID = 3;
 
 		TestHandler(
-			CompletableFuture<String> localAddressFuture,
-			GatewayRetriever<RestfulGateway> leaderRetriever,
-			Time timeout) {
+				CompletableFuture<String> localAddressFuture,
+				GatewayRetriever<RestfulGateway> leaderRetriever,
+				Time timeout) {
 			super(
 				localAddressFuture,
 				leaderRetriever,
@@ -342,7 +403,8 @@ public class RestServerEndpointITCase extends TestLogger {
 			assertEquals(request.getPathParameter(JobIDPathParameter.class), PATH_JOB_ID);
 			assertEquals(request.getQueryParameter(JobIDQueryParameter.class).get(0), QUERY_JOB_ID);
 
-			if (request.getRequestBody().id == 1) {
+			final int id = request.getRequestBody().id;
+			if (id == 1) {
 				synchronized (LOCK) {
 					try {
 						LOCK.notifyAll();
@@ -350,8 +412,12 @@ public class RestServerEndpointITCase extends TestLogger {
 					} catch (InterruptedException ignored) {
 					}
 				}
+			} else if (id == LARGE_RESPONSE_BODY_ID) {
+				return CompletableFuture.completedFuture(new TestResponse(
+					id,
+					createStringOfSize(TEST_REST_MAX_CONTENT_LENGTH)));
 			}
-			return CompletableFuture.completedFuture(new TestResponse(request.getRequestBody().id));
+			return CompletableFuture.completedFuture(new TestResponse(id));
 		}
 	}
 
@@ -365,18 +431,37 @@ public class RestServerEndpointITCase extends TestLogger {
 	private static class TestRequest implements RequestBody {
 		public final int id;
 
+		public final String content;
+
+		public TestRequest(int id) {
+			this(id, null);
+		}
+
 		@JsonCreator
-		public TestRequest(@JsonProperty("id") int id) {
+		public TestRequest(
+				@JsonProperty("id") int id,
+				@JsonProperty("content") final String content) {
 			this.id = id;
+			this.content = content;
 		}
 	}
 
 	private static class TestResponse implements ResponseBody {
+
 		public final int id;
 
+		public final String content;
+
+		public TestResponse(int id) {
+			this(id, null);
+		}
+
 		@JsonCreator
-		public TestResponse(@JsonProperty("id") int id) {
+		public TestResponse(
+				@JsonProperty("id") int id,
+				@JsonProperty("content") String content) {
 			this.id = id;
+			this.content = content;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/da3fc4fd/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
index 997601f..af8b995 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -126,7 +126,7 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger
{
 			CompletableFuture.completedFuture("127.0.0.1:9527"),
 			() -> null,
 			Time.milliseconds(100),
-			restHandlerConfiguration.getResponseHeaders(),
+			Collections.emptyMap(),
 			SubtaskCurrentAttemptDetailsHeaders.getInstance(),
 			new ExecutionGraphCache(
 				restHandlerConfiguration.getTimeout(),

http://git-wip-us.apache.org/repos/asf/flink/blob/da3fc4fd/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
index 5f03c55..318541d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
@@ -41,6 +41,7 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -62,7 +63,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandlerTest extends TestLogger
{
 			CompletableFuture.completedFuture("127.0.0.1:9527"),
 			() -> null,
 			Time.milliseconds(100L),
-			restHandlerConfiguration.getResponseHeaders(),
+			Collections.emptyMap(),
 			SubtaskExecutionAttemptAccumulatorsHeaders.getInstance(),
 			new ExecutionGraphCache(
 				restHandlerConfiguration.getTimeout(),

http://git-wip-us.apache.org/repos/asf/flink/blob/da3fc4fd/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
index d55ab77..8e44c0e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -129,7 +129,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger
{
 			CompletableFuture.completedFuture("127.0.0.1:9527"),
 			() -> null,
 			Time.milliseconds(100L),
-			restHandlerConfiguration.getResponseHeaders(),
+			Collections.emptyMap(),
 			SubtaskExecutionAttemptDetailsHeaders.getInstance(),
 			new ExecutionGraphCache(
 				restHandlerConfiguration.getTimeout(),


Mime
View raw message