flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [1/3] [YARN] properly set diagnostics messages on failures
Date Wed, 20 Aug 2014 09:15:56 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master 859490533 -> e8f2e9d0e


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
index 6c3b2b9..7856652 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
@@ -40,33 +40,34 @@ import org.eclipse.jetty.server.handler.ResourceHandler;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 
+
 /**
  * This class sets up a web-server that contains a web frontend to display information about
running jobs.
  * It instantiates and configures an embedded jetty server.
  */
 public class WebInfoServer {
-	
+
 	/**
 	 * The log for this class.
 	 */
 	private static final Log LOG = LogFactory.getLog(WebInfoServer.class);
-	
+
 	/**
 	 * The jetty server serving all requests.
 	 */
 	private final Server server;
-	
+
 	/**
 	 * Port for info server
 	 */
 	private int port;
-	
+
 	/**
 	 * Creates a new web info server. The server runs the servlets that implement the logic
-	 * to list all present information concerning the job manager 
-	 * 
+	 * to list all present information concerning the job manager
+	 *
 	 * @param nepheleConfig
-	 *        The configuration for the nephele job manager. 
+	 *        The configuration for the nephele job manager.
 	 * @param port
 	 *        The port to launch the server on.
 	 * @throws IOException
@@ -74,18 +75,24 @@ public class WebInfoServer {
 	 */
 	public WebInfoServer(Configuration nepheleConfig, int port, JobManager jobmanager) throws
IOException {
 		this.port = port;
-		
+
 		// if no explicit configuration is given, use the global configuration
 		if (nepheleConfig == null) {
 			nepheleConfig = GlobalConfiguration.getConfiguration();
 		}
-		
+
 		// get base path of Flink installation
-		String basePath = nepheleConfig.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, "");
-		String webDirPath = nepheleConfig.getString(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ROOT_PATH);
-		String logDirPath = nepheleConfig.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY,

-				basePath+"/log");
-		
+		final String basePath = nepheleConfig.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY,
"");
+		final String webDirPath = nepheleConfig.getString(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ROOT_PATH);
+		final String[] logDirPaths = nepheleConfig.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY,
+				basePath+"/log").split(","); // YARN allows to specify multiple log directories
+
+		final File[] logDirFiles = new File[logDirPaths.length];
+		int i = 0;
+		for(String path : logDirPaths) {
+			logDirFiles[i++] = new File(path);
+		}
+
 		File webDir;
 		if(webDirPath.startsWith("/")) {
 			// absolute path
@@ -94,12 +101,12 @@ public class WebInfoServer {
 			// path relative to base dir
 			webDir = new File(basePath+"/"+webDirPath);
 		}
-		
-		
+
+
 		if (LOG.isInfoEnabled()) {
 			LOG.info("Setting up web info server, using web-root directory '" + webDir.getAbsolutePath()
+ "'.");
 			//LOG.info("Web info server will store temporary files in '" + tmpDir.getAbsolutePath());
-	
+
 			LOG.info("Web info server will display information about nephele job-manager on "
 				+ nepheleConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + ", port
"
 				+ port
@@ -108,17 +115,17 @@ public class WebInfoServer {
 
 		// ensure that the directory with the web documents exists
 		if (!webDir.exists()) {
-			throw new FileNotFoundException("Cannot start jobmanager web info server. The directory
containing the web documents does not exist: " 
+			throw new FileNotFoundException("Cannot start jobmanager web info server. The directory
containing the web documents does not exist: "
 				+ webDir.getAbsolutePath());
 		}
-		
+
 		server = new Server(port);
 
 		// ----- the handlers for the servlets -----
 		ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS);
 		servletContext.setContextPath("/");
 		servletContext.addServlet(new ServletHolder(new JobmanagerInfoServlet(jobmanager)), "/jobsInfo");
-		servletContext.addServlet(new ServletHolder(new LogfileInfoServlet(new File(logDirPath))),
"/logInfo");
+		servletContext.addServlet(new ServletHolder(new LogfileInfoServlet(logDirFiles)), "/logInfo");
 		servletContext.addServlet(new ServletHolder(new SetupInfoServlet(jobmanager)), "/setupInfo");
 		servletContext.addServlet(new ServletHolder(new MenuServlet()), "/menu");
 
@@ -172,10 +179,10 @@ public class WebInfoServer {
 			server.setHandler(handlers);
 		}
 	}
-	
+
 	/**
 	 * Starts the web frontend server.
-	 * 
+	 *
 	 * @throws Exception
 	 *         Thrown, if the start fails.
 	 */

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestInstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestInstanceManager.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestInstanceManager.java
index f87a1dd..9286def 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestInstanceManager.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestInstanceManager.java
@@ -177,7 +177,7 @@ public final class TestInstanceManager implements InstanceManager {
 	}
 
 	@Override
-	public int getNumberOfTaskTrackers() {
+	public int getNumberOfTaskManagers() {
 		throw new IllegalStateException("getNumberOfTaskTrackers called on TestInstanceManager");
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/tools/.gitignore
----------------------------------------------------------------------
diff --git a/tools/.gitignore b/tools/.gitignore
new file mode 100644
index 0000000..2546bad
--- /dev/null
+++ b/tools/.gitignore
@@ -0,0 +1 @@
+merge_pull_request.sh
\ No newline at end of file


Mime
View raw message