flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [1/7] flink git commit: [FLINK-5629] [runtime-web] Close RAF in FileServerHandlers when exception occurs
Date Sat, 15 Apr 2017 17:50:13 GMT
Repository: flink
Updated Branches:
  refs/heads/master a30f29f68 -> 283f5efd5


[FLINK-5629] [runtime-web] Close RAF in FileServerHandlers when exception occurs

This closes #3678.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/be26f7ed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/be26f7ed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/be26f7ed

Branch: refs/heads/master
Commit: be26f7ed1e2b97bc2e6ab06d6267f8d542d78aee
Parents: a30f29f
Author: zentol <chesnay@apache.org>
Authored: Wed Apr 5 23:37:06 2017 +0200
Committer: zentol <chesnay@apache.org>
Committed: Sat Apr 15 13:36:30 2017 +0200

----------------------------------------------------------------------
 .../files/StaticFileServerHandler.java          | 61 ++++++++++---------
 .../HistoryServerStaticFileServerHandler.java   | 63 +++++++++++---------
 2 files changed, 69 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/be26f7ed/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
index b7874c9..406baf0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
@@ -296,37 +296,44 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 			sendError(ctx, NOT_FOUND);
 			return;
 		}
-		long fileLength = raf.length();
 
-		HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
-		setContentTypeHeader(response, file);
-
-		// since the log and out files are rapidly changing, we don't want to browser to cache
them
-		if (!(requestPath.contains("log") || requestPath.contains("out"))) {
-			setDateAndCacheHeaders(response, file);
-		}
-		if (HttpHeaders.isKeepAlive(request)) {
-			response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
-		}
-		HttpHeaders.setContentLength(response, fileLength);
+		try {
+			long fileLength = raf.length();
 
-		// write the initial line and the header.
-		ctx.write(response);
+			HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+			setContentTypeHeader(response, file);
 
-		// write the content.
-		ChannelFuture lastContentFuture;
-		if (ctx.pipeline().get(SslHandler.class) == null) {
-			ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
-			lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
-		} else {
-			lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength,
8192)),
-				ctx.newProgressivePromise());
-			// HttpChunkedInput will write the end marker (LastHttpContent) for us.
-		}
+			// since the log and out files are rapidly changing, we don't want to browser to cache
them
+			if (!(requestPath.contains("log") || requestPath.contains("out"))) {
+				setDateAndCacheHeaders(response, file);
+			}
+			if (HttpHeaders.isKeepAlive(request)) {
+				response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+			}
+			HttpHeaders.setContentLength(response, fileLength);
+
+			// write the initial line and the header.
+			ctx.write(response);
+
+			// write the content.
+			ChannelFuture lastContentFuture;
+			if (ctx.pipeline().get(SslHandler.class) == null) {
+				ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
+				lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+			} else {
+				lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength,
8192)),
+					ctx.newProgressivePromise());
+				// HttpChunkedInput will write the end marker (LastHttpContent) for us.
+			}
 
-		// close the connection, if no keep-alive is needed
-		if (!HttpHeaders.isKeepAlive(request)) {
-			lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+			// close the connection, if no keep-alive is needed
+			if (!HttpHeaders.isKeepAlive(request)) {
+				lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+			}
+		} catch (Exception e) {
+			raf.close();
+			logger.error("Failed to serve file.", e);
+			sendError(ctx, INTERNAL_SERVER_ERROR);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/be26f7ed/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
index 31e9bbc..ba0e2d2 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
@@ -205,37 +205,44 @@ public class HistoryServerStaticFileServerHandler extends SimpleChannelInboundHa
 			StaticFileServerHandler.sendError(ctx, NOT_FOUND);
 			return;
 		}
-		long fileLength = raf.length();
 
-		HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
-		StaticFileServerHandler.setContentTypeHeader(response, file);
+		try {
+			long fileLength = raf.length();
 
-		// the job overview should be updated as soon as possible
-		if (!requestPath.equals("/joboverview.json")) {
-			StaticFileServerHandler.setDateAndCacheHeaders(response, file);
-		}
-		if (HttpHeaders.isKeepAlive(request)) {
-			response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
-		}
-		HttpHeaders.setContentLength(response, fileLength);
-
-		// write the initial line and the header.
-		ctx.write(response);
-
-		// write the content.
-		ChannelFuture lastContentFuture;
-		if (ctx.pipeline().get(SslHandler.class) == null) {
-			ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
-			lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
-		} else {
-			lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength,
8192)),
-				ctx.newProgressivePromise());
-			// HttpChunkedInput will write the end marker (LastHttpContent) for us.
-		}
+			HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+			StaticFileServerHandler.setContentTypeHeader(response, file);
+
+			// the job overview should be updated as soon as possible
+			if (!requestPath.equals("/joboverview.json")) {
+				StaticFileServerHandler.setDateAndCacheHeaders(response, file);
+			}
+			if (HttpHeaders.isKeepAlive(request)) {
+				response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+			}
+			HttpHeaders.setContentLength(response, fileLength);
+
+			// write the initial line and the header.
+			ctx.write(response);
+
+			// write the content.
+			ChannelFuture lastContentFuture;
+			if (ctx.pipeline().get(SslHandler.class) == null) {
+				ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
+				lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+			} else {
+				lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength,
8192)),
+					ctx.newProgressivePromise());
+				// HttpChunkedInput will write the end marker (LastHttpContent) for us.
+			}
 
-		// close the connection, if no keep-alive is needed
-		if (!HttpHeaders.isKeepAlive(request)) {
-			lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+			// close the connection, if no keep-alive is needed
+			if (!HttpHeaders.isKeepAlive(request)) {
+				lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+			}
+		} catch (Exception e) {
+			raf.close();
+			LOG.error("Failed to serve file.", e);
+			StaticFileServerHandler.sendError(ctx, INTERNAL_SERVER_ERROR);
 		}
 	}
 


Mime
View raw message