flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] zentol closed pull request #6651: [1.5][FLINK-10115][rest] Ignore content-length limit for FileUploads
Date Mon, 03 Sep 2018 11:39:26 GMT
zentol closed pull request #6651: [1.5][FLINK-10115][rest] Ignore content-length limit for
FileUploads 
URL: https://github.com/apache/flink/pull/6651
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
index e8d93845107..0a0d833626e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
@@ -141,6 +141,7 @@ protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed,
T gatew
 					hre);
 			}
 
+			log.trace("Starting request processing.");
 			CompletableFuture<Void> requestProcessingFuture = respondToRequest(
 				ctx,
 				httpRequest,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
index c58e86ed1b9..2fdaafaeee2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
@@ -21,6 +21,7 @@
 import org.apache.flink.runtime.rest.handler.FileUploads;
 import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.util.RestConstants;
 import org.apache.flink.util.FileUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
@@ -30,6 +31,7 @@
 import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultLastHttpContent;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
@@ -97,6 +99,7 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject
ms
 				LOG.trace("Received request. URL:{} Method:{}", httpRequest.getUri(), httpRequest.getMethod());
 				if (httpRequest.getMethod().equals(HttpMethod.POST)) {
 					if (HttpPostRequestDecoder.isMultipart(httpRequest)) {
+						LOG.trace("Initializing multipart file upload.");
 						checkState(currentHttpPostRequestDecoder == null);
 						checkState(currentHttpRequest == null);
 						checkState(currentUploadDir == null);
@@ -110,6 +113,7 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject
ms
 					ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
 				}
 			} else if (msg instanceof HttpContent && currentHttpPostRequestDecoder != null)
{
+				LOG.trace("Received http content.");
 				// make sure that we still have a upload dir in case that it got deleted in the meanwhile
 				RestServerEndpoint.createUploadDir(uploadDir, LOG);
 
@@ -124,9 +128,11 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject
ms
 
 						final Path dest = currentUploadDir.resolve(fileUpload.getFilename());
 						fileUpload.renameTo(dest.toFile());
+						LOG.trace("Upload of file {} complete.", fileUpload.getFilename());
 					} else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
 						final Attribute request = (Attribute) data;
 						// this could also be implemented by using the first found Attribute as the payload
+						LOG.trace("Upload of attribute {} complete.", request.getName());
 						if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
 							currentJsonPayload = request.get();
 						} else {
@@ -137,17 +143,23 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject
ms
 				}
 
 				if (httpContent instanceof LastHttpContent) {
+					LOG.trace("Finalizing multipart file upload.");
 					ctx.channel().attr(UPLOADED_FILES).set(new FileUploads(currentUploadDir));
-					ctx.fireChannelRead(currentHttpRequest);
 					if (currentJsonPayload != null) {
 						// the following lines behave similar to httpContent#replace in netty 4.1
 						// the only difference is that the validateHeaders flag isn't preserved
 						// this shouldn't be a problem since we only copy existing headers
 						DefaultLastHttpContent newContent = new DefaultLastHttpContent(Unpooled.wrappedBuffer(currentJsonPayload),
false);
 						newContent.trailingHeaders().set(((LastHttpContent) httpContent).trailingHeaders());
+						currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_LENGTH, currentJsonPayload.length);
+						currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_TYPE, RestConstants.REST_CONTENT_TYPE);
+						ctx.fireChannelRead(currentHttpRequest);
 						ctx.fireChannelRead(newContent);
 					} else {
-						ctx.fireChannelRead(ReferenceCountUtil.retain(httpContent));
+						currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 0);
+						currentHttpRequest.headers().remove(HttpHeaders.Names.CONTENT_TYPE);
+						ctx.fireChannelRead(currentHttpRequest);
+						ctx.fireChannelRead(LastHttpContent.EMPTY_LAST_CONTENT);
 					}
 					reset();
 				}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
index 0153d5dd31a..c350393371a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
@@ -102,6 +102,8 @@ public void before() throws Exception {
 		Configuration config = new Configuration();
 		config.setInteger(RestOptions.PORT, 0);
 		config.setString(RestOptions.ADDRESS, "localhost");
+		// set this to a lower value on purpose to test that files larger than the content limit
are still accepted
+		config.setInteger(RestOptions.SERVER_MAX_CONTENT_LENGTH, 1024 * 1024);
 		configuredUploadDir = temporaryFolder.newFolder().toPath();
 		config.setString(WebOptions.UPLOAD_DIR, configuredUploadDir.toString());
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message