flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zentol <...@git.apache.org>
Subject [GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...
Date Wed, 20 Jun 2018 15:53:00 GMT
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196836575
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
---
    @@ -70,51 +84,103 @@ public FileUploadHandler(final Path uploadDir) {
     
     	@Override
     	protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject msg) throws
Exception {
    -		if (msg instanceof HttpRequest) {
    -			final HttpRequest httpRequest = (HttpRequest) msg;
    -			if (httpRequest.getMethod().equals(HttpMethod.POST)) {
    -				if (HttpPostRequestDecoder.isMultipart(httpRequest)) {
    -					currentHttpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
    -					currentHttpRequest = httpRequest;
    +		try {
    +			if (msg instanceof HttpRequest) {
    +				final HttpRequest httpRequest = (HttpRequest) msg;
    +				LOG.trace("Received request. URL:{} Method:{}", httpRequest.getUri(), httpRequest.getMethod());
    +				if (httpRequest.getMethod().equals(HttpMethod.POST)) {
    +					if (HttpPostRequestDecoder.isMultipart(httpRequest)) {
    +						currentHttpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
    +						currentHttpRequest = httpRequest;
    +						currentUploadDir = Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString()));
    +					} else {
    +						ctx.fireChannelRead(msg);
    +					}
     				} else {
     					ctx.fireChannelRead(msg);
     				}
    +			} else if (msg instanceof HttpContent && currentHttpPostRequestDecoder !=
null) {
    +				// make sure that we still have a upload dir in case that it got deleted in the meanwhile
    +				RestServerEndpoint.createUploadDir(uploadDir, LOG);
    +
    +				final HttpContent httpContent = (HttpContent) msg;
    +				currentHttpPostRequestDecoder.offer(httpContent);
    +
    +				while (currentHttpPostRequestDecoder.hasNext()) {
    +					final InterfaceHttpData data = currentHttpPostRequestDecoder.next();
    +					if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.FileUpload) {
    +						final DiskFileUpload fileUpload = (DiskFileUpload) data;
    +						checkState(fileUpload.isCompleted());
    +
    +						final Path dest = currentUploadDir.resolve(fileUpload.getFilename());
    +						fileUpload.renameTo(dest.toFile());
    +					} else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
    +						final Attribute request = (Attribute) data;
    +						// this could also be implemented by using the first found Attribute as the payload
    +						if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
    +							currentJsonPayload = request.get();
    +						} else {
    +							LOG.warn("Received unknown attribute {}.", data.getName());
    +							HandlerUtils.sendErrorResponse(
    +								ctx,
    +								currentHttpRequest,
    +								new ErrorResponseBody("Received unknown attribute " + data.getName() + '.'),
    +								HttpResponseStatus.BAD_REQUEST,
    +								Collections.emptyMap()
    +							);
    +							deleteUploadedFiles();
    +							reset();
    +							return;
    +						}
    +					}
    +				}
    +
    +				if (httpContent instanceof LastHttpContent) {
    +					ctx.channel().attr(UPLOADED_FILES).set(new FileUploads(Collections.singleton(currentUploadDir)));
    +					ctx.fireChannelRead(currentHttpRequest);
    +					if (currentJsonPayload != null) {
    +						ctx.fireChannelRead(httpContent.replace(Unpooled.wrappedBuffer(currentJsonPayload)));
    +					} else {
    +						ctx.fireChannelRead(httpContent);
    +					}
    +					reset();
    +				}
     			} else {
     				ctx.fireChannelRead(msg);
     			}
    -		} else if (msg instanceof HttpContent && currentHttpPostRequestDecoder != null)
{
    -			// make sure that we still have a upload dir in case that it got deleted in the meanwhile
    -			RestServerEndpoint.createUploadDir(uploadDir, LOG);
    -
    -			final HttpContent httpContent = (HttpContent) msg;
    -			currentHttpPostRequestDecoder.offer(httpContent);
    -
    -			while (currentHttpPostRequestDecoder.hasNext()) {
    -				final InterfaceHttpData data = currentHttpPostRequestDecoder.next();
    -				if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.FileUpload) {
    -					final DiskFileUpload fileUpload = (DiskFileUpload) data;
    -					checkState(fileUpload.isCompleted());
    -
    -					final Path dest = uploadDir.resolve(Paths.get(UUID.randomUUID() +
    -						"_" + fileUpload.getFilename()));
    -					fileUpload.renameTo(dest.toFile());
    -					ctx.channel().attr(UPLOADED_FILE).set(dest);
    -				}
    -			}
    +		} catch (Exception e) {
    +			LOG.warn("Internal server error. File upload failed.", e);
    +			HandlerUtils.sendErrorResponse(
    +				ctx,
    +				currentHttpRequest,
    +				new ErrorResponseBody("File upload failed."),
    +				HttpResponseStatus.INTERNAL_SERVER_ERROR,
    +				Collections.emptyMap()
    +			);
    +			deleteUploadedFiles();
    +			reset();
    +		}
    +	}
     
    -			if (httpContent instanceof LastHttpContent) {
    -				ctx.fireChannelRead(currentHttpRequest);
    -				ctx.fireChannelRead(httpContent);
    -				reset();
    +	private void deleteUploadedFiles() {
    +		if (currentUploadDir != null) {
    +			try (FileUploads uploads = new FileUploads(Collections.singleton(currentUploadDir)))
{
    --- End diff --
    
    The idea was to define the cleanup logic in one place, in this case in the `FileUploads`
class.


---

Mime
View raw message