flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject flink git commit: [FLINK-3125] [web dashboard] Web server starts also when JobManager log files cannot be accessed.
Date Sun, 06 Dec 2015 21:35:19 GMT
Repository: flink
Updated Branches:
  refs/heads/release-0.10 319e18045 -> a36822100


[FLINK-3125] [web dashboard] Web server starts also when JobManager log files cannot be accessed.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3682210
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3682210
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3682210

Branch: refs/heads/release-0.10
Commit: a368221006033ae8a48d2e81dc535a3eda0b0b87
Parents: 319e180
Author: Stephan Ewen <sewen@apache.org>
Authored: Sun Dec 6 18:07:26 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Sun Dec 6 18:19:17 2015 +0100

----------------------------------------------------------------------
 .../runtime/webmonitor/WebRuntimeMonitor.java   | 26 +++---
 .../handlers/ConstantTextHandler.java           | 67 ++++++++++++++
 .../runtime/webmonitor/WebMonitorUtils.java     | 91 +++++++++-----------
 .../flink/test/web/WebFrontendITCase.java       | 11 +--
 4 files changed, 125 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a3682210/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 03a25a2..a7c6eb2 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -35,6 +35,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
+import org.apache.flink.runtime.webmonitor.handlers.ConstantTextHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobAccumulatorsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobCancellationHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobManagerConfigHandler;
@@ -99,8 +100,6 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 	private final Router router;
 
-	private final int configuredPort;
-
 	private final ServerBootstrap bootstrap;
 
 	private final Promise<String> jobManagerAddressPromise = new scala.concurrent.impl.Promise.DefaultPromise<>();
@@ -118,6 +117,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 			Configuration config,
 			LeaderRetrievalService leaderRetrievalService,
 			ActorSystem actorSystem) throws IOException, InterruptedException {
+		
 		this.leaderRetrievalService = checkNotNull(leaderRetrievalService);
 
 		final WebMonitorConfig cfg = new WebMonitorConfig(config);
@@ -127,15 +127,12 @@ public class WebRuntimeMonitor implements WebMonitor {
 		webRootDir = new File(System.getProperty("java.io.tmpdir"), fileName);
 		LOG.info("Using directory {} for the web interface files", webRootDir);
 
-		final WebMonitorUtils.LogFiles logFiles = WebMonitorUtils.LogFiles.find(config);
-
-		LOG.info("Serving job manager log from {}", logFiles.logFile.getAbsolutePath());
-		LOG.info("Serving job manager stdout from {}", logFiles.stdOutFile.getAbsolutePath());
-
+		final WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(config);
+		
 		// port configuration
-		this.configuredPort = cfg.getWebFrontendPort();
-		if (this.configuredPort < 0) {
-			throw new IllegalArgumentException("Web frontend port is invalid: " + this.configuredPort);
+		int configuredPort = cfg.getWebFrontendPort();
+		if (configuredPort < 0) {
+			throw new IllegalArgumentException("Web frontend port is invalid: " + configuredPort);
 		}
 
 		timeout = AkkaUtils.getTimeout(config);
@@ -144,7 +141,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 		retriever = new JobManagerRetriever(this, actorSystem, lookupTimeout, timeout);
 
 		ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder();
-
+		
 		router = new Router()
 			// config how to interact with this web server
 			.GET("/config", handler(new DashboardConfigHandler(cfg.getRefreshInterval())))
@@ -183,8 +180,11 @@ public class WebRuntimeMonitor implements WebMonitor {
 			.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY, handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
 
 			// log and stdout
-			.GET("/jobmanager/log", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(),
timeout, logFiles.logFile))
-			.GET("/jobmanager/stdout", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(),
timeout, logFiles.stdOutFile))
+			.GET("/jobmanager/log", logFiles.logFile == null ? new ConstantTextHandler("(log file
unavailable)") :
+				new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.logFile))
+
+			.GET("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout
file unavailable)") :
+				new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.stdOutFile))
 
 			// Cancel a job via GET (for proper integration with YARN this has to be performed via
GET)
 			.GET("/jobs/:jobid/yarn-cancel", handler(new JobCancellationHandler()))

http://git-wip-us.apache.org/repos/asf/flink/blob/a3682210/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java
new file mode 100644
index 0000000..30d825f
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.router.KeepAliveWrite;
+import io.netty.handler.codec.http.router.Routed;
+
+import org.apache.flink.runtime.webmonitor.files.MimeTypes;
+
+import java.io.UnsupportedEncodingException;
+
+/**
+ * Responder that returns a constant String.
+ */
+@ChannelHandler.Sharable
+public class ConstantTextHandler extends SimpleChannelInboundHandler<Routed> {
+	
+	private static final String MIME_TYPE = MimeTypes.getMimeTypeForExtension("txt");
+	
+	private final byte[] encodedText;
+	
+	public ConstantTextHandler(String text) {
+		try {
+			this.encodedText = text.getBytes("UTF-8");
+		}
+		catch (UnsupportedEncodingException e) {
+			throw new RuntimeException(e.getMessage(), e);
+		}
+	}
+	
+	@Override
+	protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
+		HttpResponse response = new DefaultFullHttpResponse(
+			HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(encodedText));
+
+		response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8");
+		response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, encodedText.length);
+		response.headers().set(HttpHeaders.Names.CONTENT_TYPE, MIME_TYPE);
+
+		KeepAliveWrite.flush(ctx, routed.request(), response);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a3682210/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index a7a3458..41cb83b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -19,9 +19,9 @@
 package org.apache.flink.runtime.webmonitor;
 
 import akka.actor.ActorSystem;
+
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -29,9 +29,11 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,60 +55,53 @@ public final class WebMonitorUtils {
 	/**
 	 * Singleton to hold the log and stdout file
 	 */
-	public static class LogFiles {
-
-		private static LogFiles INSTANCE;
+	public static class LogFileLocation {
 
 		public final File logFile;
 		public final File stdOutFile;
 
-		private LogFiles(String logFile) {
-			this.logFile = checkFileLocation(logFile);
-			String stdOutFile = logFile.replaceFirst("\\.log$", ".out");
-			this.stdOutFile = checkFileLocation(stdOutFile);;
+		private LogFileLocation(File logFile, File stdOutFile) {
+			this.logFile = logFile;
+			this.stdOutFile = stdOutFile;
 		}
+		
 
 		/**
-		 * Verify log file location
-		 * @param logFilePath Path to log file
-		 * @return File or null if not a valid log file
+		 * Finds the Flink log directory using log.file Java property that is set during startup.
 		 */
-		private static File checkFileLocation (String logFilePath) {
-			File logFile = new File(logFilePath);
-			if (logFile.exists() && logFile.canRead()) {
-				return logFile;
-			} else {
-				throw new IllegalConfigurationException("Job manager log file was supposed to be at "
+
-						logFile.getAbsolutePath() + " but it does not exist or is not readable.");
+		public static LogFileLocation find(Configuration config) {
+			final String logEnv = "log.file";
+			String logFilePath = System.getProperty(logEnv);
+			
+			if (logFilePath == null) {
+				LOG.warn("Log file environment variable '{}' is not set.", logEnv);
+				logFilePath = config.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, null);
+			}
+			
+			// not configured, cannot serve log files
+			if (logFilePath == null || logFilePath.length() < 4) {
+				LOG.warn("JobManager log files are unavailable in the web dashboard. " +
+					"Log file location not found in environment variable '{}' or configuration key '{}'.",
+					logEnv, ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY);
+				return new LogFileLocation(null, null);
 			}
+			
+			String outFilePath = logFilePath.substring(0, logFilePath.length() - 3).concat("out");
+
+			LOG.info("Determined location of JobManager log file: {}", logFilePath);
+			LOG.info("Determined location of JobManager stdout file: {}", outFilePath);
+			
+			return new LogFileLocation(resolveFileLocation(logFilePath), resolveFileLocation(outFilePath));
 		}
 
 		/**
-		 * Finds the Flink log directory using log.file Java property that is set during startup.
+		 * Verify log file location
+		 * @param logFilePath Path to log file
+		 * @return File or null if not a valid log file
 		 */
-		public static LogFiles find(Configuration config) {
-			if (INSTANCE == null) {
-
-				/** Figure out log file location based on 'log.file' VM argument **/
-				final String logEnv = "log.file";
-				String logFilePath = System.getProperty(logEnv);
-
-				if (logFilePath == null) {
-					LOG.warn("Log file environment variable '{}' is not set.", logEnv);
-					logFilePath = config.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, null);
-				}
-
-				if (logFilePath == null) {
-					throw new IllegalConfigurationException("JobManager log file not found. " +
-							"Can't serve log files. Log file location couldn't be determined via the " +
-							logEnv + " environment variable or the config constant " +
-							ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY);
-				}
-
-				INSTANCE = new LogFiles(logFilePath);
-			}
-
-			return INSTANCE;
+		private static File resolveFileLocation(String logFilePath) {
+			File logFile = new File(logFilePath);
+			return (logFile.exists() && logFile.canRead()) ? logFile : null;
 		}
 	}
 
@@ -127,9 +122,9 @@ public final class WebMonitorUtils {
 		// try to load and instantiate the class
 		try {
 			String classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor";
-			Class clazz = Class.forName(classname).asSubclass(WebMonitor.class);
-			@SuppressWarnings("unchecked")
-			Constructor<WebMonitor> constructor = clazz.getConstructor(Configuration.class,
+			Class<? extends WebMonitor> clazz = Class.forName(classname).asSubclass(WebMonitor.class);
+			
+			Constructor<? extends WebMonitor> constructor = clazz.getConstructor(Configuration.class,
 					LeaderRetrievalService.class,
 					ActorSystem.class);
 			return constructor.newInstance(config, leaderRetrievalService, actorSystem);
@@ -147,7 +142,7 @@ public final class WebMonitorUtils {
 		}
 	}
 
-	public static Map<String, String> fromKeyValueJsonArray (JSONArray parsed) throws
JSONException {
+	public static Map<String, String> fromKeyValueJsonArray(JSONArray parsed) throws JSONException
{
 		Map<String, String> hashMap = new HashMap<>();
 
 		for (int i = 0; i < parsed.length(); i++) {
@@ -194,6 +189,4 @@ public final class WebMonitorUtils {
 	private WebMonitorUtils() {
 		throw new RuntimeException();
 	}
-
-
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a3682210/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
index 9c37a95..972e451 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
@@ -32,13 +32,10 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import scala.concurrent.duration.FiniteDuration;
 
-import java.io.File;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
 @RunWith(Parameterized.class)
 public class WebFrontendITCase extends MultipleProgramsTestBase {
@@ -56,8 +53,6 @@ public class WebFrontendITCase extends MultipleProgramsTestBase {
 		port = webMonitor.getServerPort();
 	}
 
-	static final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
-
 	public WebFrontendITCase(TestExecutionMode m) {
 		super(m);
 	}
@@ -115,7 +110,7 @@ public class WebFrontendITCase extends MultipleProgramsTestBase {
 	@Test
 	public void getLogAndStdoutFiles() {
 		try {
-			WebMonitorUtils.LogFiles logFiles = WebMonitorUtils.LogFiles.find(cluster.configuration());
+			WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(cluster.configuration());
 
 			FileUtils.writeStringToFile(logFiles.logFile, "job manager log");
 			String logs = getFromHTTP("http://localhost:" + port + "/jobmanager/log");
@@ -124,7 +119,7 @@ public class WebFrontendITCase extends MultipleProgramsTestBase {
 			FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out");
 			logs = getFromHTTP("http://localhost:" + port + "/jobmanager/stdout");
 			Assert.assertTrue(logs.contains("job manager out"));
-		}catch(Throwable e) {
+		} catch(Throwable e) {
 			e.printStackTrace();
 			Assert.fail(e.getMessage());
 		}
@@ -141,7 +136,7 @@ public class WebFrontendITCase extends MultipleProgramsTestBase {
 			Assert.assertEquals(
 					cluster.configuration().getString("taskmanager.numberOfTaskSlots", null),
 					conf.get(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS));
-		} catch(Throwable e) {
+		} catch (Throwable e) {
 			e.printStackTrace();
 			Assert.fail(e.getMessage());
 		}


Mime
View raw message