Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C8FCB1882D for ; Sun, 6 Dec 2015 21:35:19 +0000 (UTC) Received: (qmail 12540 invoked by uid 500); 6 Dec 2015 21:35:19 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 12495 invoked by uid 500); 6 Dec 2015 21:35:19 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 12480 invoked by uid 99); 6 Dec 2015 21:35:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 06 Dec 2015 21:35:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 858DDE027D; Sun, 6 Dec 2015 21:35:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Message-Id: <18a26b8cbde54a9d9c6b076678015c37@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-3125] [web dashboard] Web server starts also when JobManager log files cannot be accessed. Date: Sun, 6 Dec 2015 21:35:19 +0000 (UTC) Repository: flink Updated Branches: refs/heads/release-0.10 319e18045 -> a36822100 [FLINK-3125] [web dashboard] Web server starts also when JobManager log files cannot be accessed. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3682210 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3682210 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3682210 Branch: refs/heads/release-0.10 Commit: a368221006033ae8a48d2e81dc535a3eda0b0b87 Parents: 319e180 Author: Stephan Ewen Authored: Sun Dec 6 18:07:26 2015 +0100 Committer: Stephan Ewen Committed: Sun Dec 6 18:19:17 2015 +0100 ---------------------------------------------------------------------- .../runtime/webmonitor/WebRuntimeMonitor.java | 26 +++--- .../handlers/ConstantTextHandler.java | 67 ++++++++++++++ .../runtime/webmonitor/WebMonitorUtils.java | 91 +++++++++----------- .../flink/test/web/WebFrontendITCase.java | 11 +-- 4 files changed, 125 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a3682210/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index 03a25a2..a7c6eb2 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -35,6 +35,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler; +import org.apache.flink.runtime.webmonitor.handlers.ConstantTextHandler; import org.apache.flink.runtime.webmonitor.handlers.JobAccumulatorsHandler; import org.apache.flink.runtime.webmonitor.handlers.JobCancellationHandler; import org.apache.flink.runtime.webmonitor.handlers.JobManagerConfigHandler; @@ -99,8 +100,6 @@ public class WebRuntimeMonitor implements WebMonitor { private final Router router; - private final int configuredPort; - private final ServerBootstrap bootstrap; private final Promise jobManagerAddressPromise = new scala.concurrent.impl.Promise.DefaultPromise<>(); @@ -118,6 +117,7 @@ public class WebRuntimeMonitor implements WebMonitor { Configuration config, LeaderRetrievalService leaderRetrievalService, ActorSystem actorSystem) throws IOException, InterruptedException { + this.leaderRetrievalService = checkNotNull(leaderRetrievalService); final WebMonitorConfig cfg = new WebMonitorConfig(config); @@ -127,15 +127,12 @@ public class WebRuntimeMonitor implements WebMonitor { webRootDir = new File(System.getProperty("java.io.tmpdir"), fileName); LOG.info("Using directory {} for the web interface files", webRootDir); - final WebMonitorUtils.LogFiles logFiles = WebMonitorUtils.LogFiles.find(config); - - LOG.info("Serving job manager log from {}", logFiles.logFile.getAbsolutePath()); - LOG.info("Serving job manager stdout from {}", logFiles.stdOutFile.getAbsolutePath()); - + final WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(config); + // port configuration - this.configuredPort = cfg.getWebFrontendPort(); - if (this.configuredPort < 0) { - throw new IllegalArgumentException("Web frontend port is invalid: " + this.configuredPort); + int configuredPort = cfg.getWebFrontendPort(); + if (configuredPort < 0) { + throw new IllegalArgumentException("Web frontend port is invalid: " + configuredPort); } timeout = AkkaUtils.getTimeout(config); @@ -144,7 +141,7 @@ public class WebRuntimeMonitor implements WebMonitor { retriever = new JobManagerRetriever(this, actorSystem, lookupTimeout, timeout); ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder(); - + router = new Router() // config how to interact with this web server .GET("/config", handler(new DashboardConfigHandler(cfg.getRefreshInterval()))) @@ -183,8 +180,11 @@ public class WebRuntimeMonitor implements WebMonitor { .GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY, handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT))) // log and stdout - .GET("/jobmanager/log", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.logFile)) - .GET("/jobmanager/stdout", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.stdOutFile)) + .GET("/jobmanager/log", logFiles.logFile == null ? new ConstantTextHandler("(log file unavailable)") : + new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.logFile)) + + .GET("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") : + new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.stdOutFile)) // Cancel a job via GET (for proper integration with YARN this has to be performed via GET) .GET("/jobs/:jobid/yarn-cancel", handler(new JobCancellationHandler())) http://git-wip-us.apache.org/repos/asf/flink/blob/a3682210/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java new file mode 100644 index 0000000..30d825f --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.router.KeepAliveWrite; +import io.netty.handler.codec.http.router.Routed; + +import org.apache.flink.runtime.webmonitor.files.MimeTypes; + +import java.io.UnsupportedEncodingException; + +/** + * Responder that returns a constant String. + */ +@ChannelHandler.Sharable +public class ConstantTextHandler extends SimpleChannelInboundHandler { + + private static final String MIME_TYPE = MimeTypes.getMimeTypeForExtension("txt"); + + private final byte[] encodedText; + + public ConstantTextHandler(String text) { + try { + this.encodedText = text.getBytes("UTF-8"); + } + catch (UnsupportedEncodingException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception { + HttpResponse response = new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(encodedText)); + + response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8"); + response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, encodedText.length); + response.headers().set(HttpHeaders.Names.CONTENT_TYPE, MIME_TYPE); + + KeepAliveWrite.flush(ctx, routed.request(), response); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a3682210/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java index a7a3458..41cb83b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java @@ -19,9 +19,9 @@ package org.apache.flink.runtime.webmonitor; import akka.actor.ActorSystem; + import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; @@ -29,9 +29,11 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.webmonitor.JobDetails; + import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,60 +55,53 @@ public final class WebMonitorUtils { /** * Singleton to hold the log and stdout file */ - public static class LogFiles { - - private static LogFiles INSTANCE; + public static class LogFileLocation { public final File logFile; public final File stdOutFile; - private LogFiles(String logFile) { - this.logFile = checkFileLocation(logFile); - String stdOutFile = logFile.replaceFirst("\\.log$", ".out"); - this.stdOutFile = checkFileLocation(stdOutFile);; + private LogFileLocation(File logFile, File stdOutFile) { + this.logFile = logFile; + this.stdOutFile = stdOutFile; } + /** - * Verify log file location - * @param logFilePath Path to log file - * @return File or null if not a valid log file + * Finds the Flink log directory using log.file Java property that is set during startup. */ - private static File checkFileLocation (String logFilePath) { - File logFile = new File(logFilePath); - if (logFile.exists() && logFile.canRead()) { - return logFile; - } else { - throw new IllegalConfigurationException("Job manager log file was supposed to be at " + - logFile.getAbsolutePath() + " but it does not exist or is not readable."); + public static LogFileLocation find(Configuration config) { + final String logEnv = "log.file"; + String logFilePath = System.getProperty(logEnv); + + if (logFilePath == null) { + LOG.warn("Log file environment variable '{}' is not set.", logEnv); + logFilePath = config.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, null); + } + + // not configured, cannot serve log files + if (logFilePath == null || logFilePath.length() < 4) { + LOG.warn("JobManager log files are unavailable in the web dashboard. " + + "Log file location not found in environment variable '{}' or configuration key '{}'.", + logEnv, ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY); + return new LogFileLocation(null, null); } + + String outFilePath = logFilePath.substring(0, logFilePath.length() - 3).concat("out"); + + LOG.info("Determined location of JobManager log file: {}", logFilePath); + LOG.info("Determined location of JobManager stdout file: {}", outFilePath); + + return new LogFileLocation(resolveFileLocation(logFilePath), resolveFileLocation(outFilePath)); } /** - * Finds the Flink log directory using log.file Java property that is set during startup. + * Verify log file location + * @param logFilePath Path to log file + * @return File or null if not a valid log file */ - public static LogFiles find(Configuration config) { - if (INSTANCE == null) { - - /** Figure out log file location based on 'log.file' VM argument **/ - final String logEnv = "log.file"; - String logFilePath = System.getProperty(logEnv); - - if (logFilePath == null) { - LOG.warn("Log file environment variable '{}' is not set.", logEnv); - logFilePath = config.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, null); - } - - if (logFilePath == null) { - throw new IllegalConfigurationException("JobManager log file not found. " + - "Can't serve log files. Log file location couldn't be determined via the " + - logEnv + " environment variable or the config constant " + - ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY); - } - - INSTANCE = new LogFiles(logFilePath); - } - - return INSTANCE; + private static File resolveFileLocation(String logFilePath) { + File logFile = new File(logFilePath); + return (logFile.exists() && logFile.canRead()) ? logFile : null; } } @@ -127,9 +122,9 @@ public final class WebMonitorUtils { // try to load and instantiate the class try { String classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor"; - Class clazz = Class.forName(classname).asSubclass(WebMonitor.class); - @SuppressWarnings("unchecked") - Constructor constructor = clazz.getConstructor(Configuration.class, + Class clazz = Class.forName(classname).asSubclass(WebMonitor.class); + + Constructor constructor = clazz.getConstructor(Configuration.class, LeaderRetrievalService.class, ActorSystem.class); return constructor.newInstance(config, leaderRetrievalService, actorSystem); @@ -147,7 +142,7 @@ public final class WebMonitorUtils { } } - public static Map fromKeyValueJsonArray (JSONArray parsed) throws JSONException { + public static Map fromKeyValueJsonArray(JSONArray parsed) throws JSONException { Map hashMap = new HashMap<>(); for (int i = 0; i < parsed.length(); i++) { @@ -194,6 +189,4 @@ public final class WebMonitorUtils { private WebMonitorUtils() { throw new RuntimeException(); } - - -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/a3682210/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java index 9c37a95..972e451 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java @@ -32,13 +32,10 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import scala.concurrent.duration.FiniteDuration; -import java.io.File; import java.util.ArrayList; import java.util.Collection; import java.util.Map; -import java.util.concurrent.TimeUnit; @RunWith(Parameterized.class) public class WebFrontendITCase extends MultipleProgramsTestBase { @@ -56,8 +53,6 @@ public class WebFrontendITCase extends MultipleProgramsTestBase { port = webMonitor.getServerPort(); } - static final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS); - public WebFrontendITCase(TestExecutionMode m) { super(m); } @@ -115,7 +110,7 @@ public class WebFrontendITCase extends MultipleProgramsTestBase { @Test public void getLogAndStdoutFiles() { try { - WebMonitorUtils.LogFiles logFiles = WebMonitorUtils.LogFiles.find(cluster.configuration()); + WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(cluster.configuration()); FileUtils.writeStringToFile(logFiles.logFile, "job manager log"); String logs = getFromHTTP("http://localhost:" + port + "/jobmanager/log"); @@ -124,7 +119,7 @@ public class WebFrontendITCase extends MultipleProgramsTestBase { FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out"); logs = getFromHTTP("http://localhost:" + port + "/jobmanager/stdout"); Assert.assertTrue(logs.contains("job manager out")); - }catch(Throwable e) { + } catch(Throwable e) { e.printStackTrace(); Assert.fail(e.getMessage()); } @@ -141,7 +136,7 @@ public class WebFrontendITCase extends MultipleProgramsTestBase { Assert.assertEquals( cluster.configuration().getString("taskmanager.numberOfTaskSlots", null), conf.get(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS)); - } catch(Throwable e) { + } catch (Throwable e) { e.printStackTrace(); Assert.fail(e.getMessage()); }