flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-1579) Create a Flink History Server
Date Mon, 06 Mar 2017 15:25:38 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897495#comment-15897495
] 

ASF GitHub Bot commented on FLINK-1579:
---------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3460#discussion_r104431499
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
---
    @@ -205,215 +125,18 @@ public void channelRead0(ChannelHandlerContext ctx, Routed routed)
throws Except
     		}
     	}
     
    -	/**
    -	 * Response when running with leading JobManager.
    -	 */
    -	private void respondAsLeader(ChannelHandlerContext ctx, HttpRequest request, String
requestPath)
    -			throws IOException, ParseException, URISyntaxException {
    -
    -		// convert to absolute path
    -		final File file = new File(rootPath, requestPath);
    -
    -		if (!file.exists()) {
    -			// file does not exist. Try to load it with the classloader
    -			ClassLoader cl = StaticFileServerHandler.class.getClassLoader();
    -
    -			try(InputStream resourceStream = cl.getResourceAsStream("web" + requestPath)) {
    -				boolean success = false;
    -				try {
    -					if (resourceStream != null) {
    -						URL root = cl.getResource("web");
    -						URL requested = cl.getResource("web" + requestPath);
    -
    -						if (root != null && requested != null) {
    -							URI rootURI = new URI(root.getPath()).normalize();
    -							URI requestedURI = new URI(requested.getPath()).normalize();
    -
    -							// Check that we don't load anything from outside of the
    -							// expected scope.
    -							if (!rootURI.relativize(requestedURI).equals(requestedURI)) {
    -								logger.debug("Loading missing file from classloader: {}", requestPath);
    -								// ensure that directory to file exists.
    -								file.getParentFile().mkdirs();
    -								Files.copy(resourceStream, file.toPath());
    -
    -								success = true;
    -							}
    -						}
    -					}
    -				} catch (Throwable t) {
    -					logger.error("error while responding", t);
    -				} finally {
    -					if (!success) {
    -						logger.debug("Unable to load requested file {} from classloader", requestPath);
    -						sendError(ctx, NOT_FOUND);
    -						return;
    -					}
    -				}
    -			}
    -		}
    -
    -		if (!file.exists() || file.isHidden() || file.isDirectory() || !file.isFile()) {
    -			sendError(ctx, NOT_FOUND);
    -			return;
    -		}
    -
    -		if (!file.getCanonicalFile().toPath().startsWith(rootPath.toPath())) {
    -			sendError(ctx, NOT_FOUND);
    -			return;
    -		}
    -
    -		// cache validation
    -		final String ifModifiedSince = request.headers().get(IF_MODIFIED_SINCE);
    -		if (ifModifiedSince != null && !ifModifiedSince.isEmpty()) {
    -			SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
    -			Date ifModifiedSinceDate = dateFormatter.parse(ifModifiedSince);
    -
    -			// Only compare up to the second because the datetime format we send to the client
    -			// does not have milliseconds
    -			long ifModifiedSinceDateSeconds = ifModifiedSinceDate.getTime() / 1000;
    -			long fileLastModifiedSeconds = file.lastModified() / 1000;
    -			if (ifModifiedSinceDateSeconds == fileLastModifiedSeconds) {
    -				if (logger.isDebugEnabled()) {
    -					logger.debug("Responding 'NOT MODIFIED' for file '" + file.getAbsolutePath() + '\'');
    -				}
    -
    -				sendNotModified(ctx);
    -				return;
    -			}
    -		}
    -		
    -		if (logger.isDebugEnabled()) {
    -			logger.debug("Responding with file '" + file.getAbsolutePath() + '\'');
    -		}
    -
    -		// Don't need to close this manually. Netty's DefaultFileRegion will take care of it.
    -		final RandomAccessFile raf;
    -		try {
    -			raf = new RandomAccessFile(file, "r");
    -		}
    -		catch (FileNotFoundException e) {
    -			sendError(ctx, NOT_FOUND);
    -			return;
    -		}
    -		long fileLength = raf.length();
    -
    -		HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
    -		setContentTypeHeader(response, file);
    -
    -		// since the log and out files are rapidly changing, we don't want to browser to cache
them
    -		if (!(requestPath.contains("log") || requestPath.contains("out"))) {
    -			setDateAndCacheHeaders(response, file);
    -		}
    -		if (HttpHeaders.isKeepAlive(request)) {
    -			response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
    -		}
    -		HttpHeaders.setContentLength(response, fileLength);
    -
    -		// write the initial line and the header.
    -		ctx.write(response);
    -
    -		// write the content.
    -		ChannelFuture lastContentFuture;
    -		if (ctx.pipeline().get(SslHandler.class) == null) {
    -			ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
    -			lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
    +	@Override
    +	public String preProcessRequestPath(String requestPath) {
    +		// in case the files being accessed are logs or stdout files, find appropriate paths.
    +		if (requestPath.equals("/jobmanager/log") || requestPath.equals("/jobmanager/stdout"))
{
    +			return "";
     		} else {
    -			lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0,
fileLength, 8192)),
    -				ctx.newProgressivePromise());
    -			// HttpChunkedInput will write the end marker (LastHttpContent) for us.
    -		}
    -
    -		// close the connection, if no keep-alive is needed
    -		if (!HttpHeaders.isKeepAlive(request)) {
    -			lastContentFuture.addListener(ChannelFutureListener.CLOSE);
    +			return requestPath;
     		}
     	}
     
     	@Override
    -	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    -		if (ctx.channel().isActive()) {
    -			logger.error("Caught exception", cause);
    -			sendError(ctx, INTERNAL_SERVER_ERROR);
    -		}
    -	}
    -
    -	// ------------------------------------------------------------------------
    -	//  Utilities to encode headers and responses
    -	// ------------------------------------------------------------------------
    -
    -	/**
    -	 * Writes a simple  error response message.
    -	 *
    -	 * @param ctx    The channel context to write the response to.
    -	 * @param status The response status.
    -	 */
    -	private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status)
{
    -		FullHttpResponse response = new DefaultFullHttpResponse(
    -				HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8));
    -		response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
    -
    -		// close the connection as soon as the error message is sent.
    -		ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    -	}
    -
    -	/**
    -	 * Send the "304 Not Modified" response. This response can be used when the
    -	 * file timestamp is the same as what the browser is sending up.
    -	 *
    -	 * @param ctx The channel context to write the response to.
    -	 */
    -	private static void sendNotModified(ChannelHandlerContext ctx) {
    -		FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, NOT_MODIFIED);
    -		setDateHeader(response);
    -
    -		// close the connection as soon as the error message is sent.
    -		ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    -	}
    -
    -	/**
    -	 * Sets the "date" header for the HTTP response.
    -	 *
    -	 * @param response HTTP response
    -	 */
    -	private static void setDateHeader(FullHttpResponse response) {
    -		SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
    -		dateFormatter.setTimeZone(GMT_TIMEZONE);
    -
    -		Calendar time = new GregorianCalendar();
    -		response.headers().set(DATE, dateFormatter.format(time.getTime()));
    -	}
    -
    -	/**
    -	 * Sets the "date" and "cache" headers for the HTTP Response.
    -	 *
    -	 * @param response    The HTTP response object.
    -	 * @param fileToCache File to extract the modification timestamp from.
    -	 */
    -	private static void setDateAndCacheHeaders(HttpResponse response, File fileToCache)
{
    -		SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
    -		dateFormatter.setTimeZone(GMT_TIMEZONE);
    -
    -		// date header
    -		Calendar time = new GregorianCalendar();
    -		response.headers().set(DATE, dateFormatter.format(time.getTime()));
    -
    -		// cache headers
    -		time.add(Calendar.SECOND, HTTP_CACHE_SECONDS);
    -		response.headers().set(EXPIRES, dateFormatter.format(time.getTime()));
    -		response.headers().set(CACHE_CONTROL, "private, max-age=" + HTTP_CACHE_SECONDS);
    -		response.headers().set(LAST_MODIFIED, dateFormatter.format(new Date(fileToCache.lastModified())));
    -	}
    -
    -	/**
    -	 * Sets the content type header for the HTTP Response.
    -	 *
    -	 * @param response HTTP response
    -	 * @param file     file to extract content type
    -	 */
    -	private static void setContentTypeHeader(HttpResponse response, File file) {
    -		String mimeType = MimeTypes.getMimeTypeForFileName(file.getName());
    -		String mimeFinal = mimeType != null ? mimeType : MimeTypes.getDefaultMimeType();
    -		response.headers().set(CONTENT_TYPE, mimeFinal);
    +	protected boolean shouldCache(String requestPath) {
    +		return !(requestPath.contains("log") || requestPath.contains("out"));
    --- End diff --
    
    Should we make this more explicit in order to prevent accidental non-caching of requests
that contain out or log for another reason?


> Create a Flink History Server
> -----------------------------
>
>                 Key: FLINK-1579
>                 URL: https://issues.apache.org/jira/browse/FLINK-1579
>             Project: Flink
>          Issue Type: New Feature
>          Components: Distributed Coordination
>    Affects Versions: 0.9
>            Reporter: Robert Metzger
>            Assignee: Chesnay Schepler
>
> Right now its not possible to analyze the job results for jobs that ran on YARN, because
we'll loose the information once the JobManager has stopped.
> Therefore, I propose to implement a "Flink History Server" which serves  the results
from these jobs.
> I haven't started thinking about the implementation, but I suspect it involves some JSON
files stored in HDFS :)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message