flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/2] flink git commit: [web client] Fix webclient config forwarding
Date Wed, 25 Feb 2015 14:10:51 GMT
Repository: flink
Updated Branches:
  refs/heads/master c0c4c9f1a -> c032725d3


[web client] Fix webclient config forwarding


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9ca42190
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9ca42190
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9ca42190

Branch: refs/heads/master
Commit: 9ca421905773f25606d32f0e3f234384e4277b02
Parents: c0c4c9f
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue Feb 24 20:27:04 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Feb 25 13:56:00 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/WebFrontend.java    |  37 ++--
 .../flink/client/web/JobSubmissionServlet.java  |  62 ++++---
 .../flink/client/web/JobsInfoServlet.java       | 170 -------------------
 .../flink/client/web/PlanDisplayServlet.java    |  10 +-
 .../flink/client/web/WebInterfaceServer.java    |  46 ++---
 .../runtime/jobmanager/web/WebInfoServer.java   |   4 +-
 .../runtime/util/EnvironmentInformation.java    |  31 +++-
 .../flink/runtime/jobmanager/JobManager.scala   |  18 +-
 .../flink/runtime/taskmanager/TaskManager.scala |   8 +-
 .../util/EnvironmentInformationTest.java        |   1 +
 10 files changed, 128 insertions(+), 259 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9ca42190/flink-clients/src/main/java/org/apache/flink/client/WebFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/WebFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/WebFrontend.java
index bc88c7b..45f4391 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/WebFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/WebFrontend.java
@@ -16,9 +16,9 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.client;
 
+import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.client.web.WebInterfaceServer;
@@ -29,7 +29,6 @@ import org.apache.flink.configuration.GlobalConfiguration;
 /**
  * Main entry point for the web frontend. Creates a web server according to the configuration
  * in the given directory.
- * 
  */
 public class WebFrontend {
 	/**
@@ -38,28 +37,24 @@ public class WebFrontend {
 	private static final Logger LOG = LoggerFactory.getLogger(WebFrontend.class);
 
 	/**
-	 * Main method. accepts a single parameter, which is the config directory.
+	 * Main method. Accepts a single command line parameter, which is the config directory.
 	 * 
-	 * @param args
-	 *        The parameters to the entry point.
+	 * @param args The command line parameters.
 	 */
 	public static void main(String[] args) {
-		try {
-			// get the config directory first
-			String configDir = null;
 
-			if (args.length >= 2 && args[0].equals("--configDir")) {
-				configDir = args[1];
-			}
+		EnvironmentInformation.logEnvironmentInfo(LOG, "Web Client");
+		EnvironmentInformation.checkJavaVersion();
 
-			if (configDir == null) {
-				System.err
-					.println("Error: Configuration directory must be specified.\nWebFrontend --configDir
<directory>\n");
-				System.exit(1);
-				return;
-			}
+		// check the arguments
+		if (args.length < 2 || !args[0].equals("--configDir")) {
+			LOG.error("Wrong command line arguments. Usage: WebFrontend --configDir <directory>");
+			System.exit(1);
+		}
 
+		try {
 			// load the global configuration
+			String configDir = args[1];
 			GlobalConfiguration.loadConfiguration(configDir);
 			Configuration config = GlobalConfiguration.getConfiguration();
 			
@@ -68,15 +63,17 @@ public class WebFrontend {
 
 			// get the listening port
 			int port = config.getInteger(ConfigConstants.WEB_FRONTEND_PORT_KEY,
-				ConfigConstants.DEFAULT_WEBCLIENT_PORT);
+										ConfigConstants.DEFAULT_WEBCLIENT_PORT);
 
 			// start the server
 			WebInterfaceServer server = new WebInterfaceServer(config, port);
 			LOG.info("Starting web frontend server on port " + port + '.');
 			server.start();
 			server.join();
-		} catch (Throwable t) {
-			LOG.error("Unexpected exception: " + t.getMessage(), t);
+		}
+		catch (Throwable t) {
+			LOG.error("Exception while starting the web server: " + t.getMessage(), t);
+			System.exit(2);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9ca42190/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
index 62414bf..21cd8e7 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
@@ -89,11 +89,11 @@ public class JobSubmissionServlet extends HttpServlet {
 
 	private final Random rand;							// random number generator for UID
 	
-	private final Configuration nepheleConfig;
+	private final Configuration config;
 
 
-	public JobSubmissionServlet(Configuration nepheleConfig, File jobDir, File planDir) {
-		this.nepheleConfig = nepheleConfig;
+	public JobSubmissionServlet(Configuration config, File jobDir, File planDir) {
+		this.config = config;
 		this.jobStoreDirectory = jobDir;
 		this.planDumpDirectory = planDir;
 
@@ -139,7 +139,7 @@ public class JobSubmissionServlet extends HttpServlet {
 			}
 
 			// parse the arguments
-			List<String> params = null;
+			List<String> params;
 			try {
 				params = tokenizeArguments(args);
 			} catch (IllegalArgumentException iaex) {
@@ -166,7 +166,7 @@ public class JobSubmissionServlet extends HttpServlet {
 			}
 
 			// create the plan
-			String[] options = params.isEmpty() ? new String[0] : (String[]) params.toArray(new String[params.size()]);
+			String[] options = params.isEmpty() ? new String[0] : params.toArray(new String[params.size()]);
 			PackagedProgram program;
 			FlinkPlan optPlan;
 			Client client;
@@ -178,7 +178,7 @@ public class JobSubmissionServlet extends HttpServlet {
 					program = new PackagedProgram(jarFile, assemblerClass, options);
 				}
 				
-				client = new Client(nepheleConfig, program.getUserCodeClassLoader());
+				client = new Client(config, program.getUserCodeClassLoader());
 				
 				optPlan = client.getOptimizedPlan(program, parallelism);
 				
@@ -239,7 +239,7 @@ public class JobSubmissionServlet extends HttpServlet {
 				// we have a request to show the plan
 
 				// create a UID for the job
-				Long uid = null;
+				Long uid;
 				do {
 					uid = Math.abs(this.rand.nextLong());
 				} while (this.submittedJobs.containsKey(uid));
@@ -250,7 +250,8 @@ public class JobSubmissionServlet extends HttpServlet {
 				
 				if (optPlan instanceof StreamingPlan) {
 					((StreamingPlan) optPlan).dumpStreamingPlanAsJSON(jsonFile);
-				} else {
+				}
+				else {
 					PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
 					jsonGen.setEncodeForHTML(true);
 					jsonGen.dumpOptimizerPlanAsJSON((OptimizedPlan) optPlan, jsonFile);
@@ -258,16 +259,24 @@ public class JobSubmissionServlet extends HttpServlet {
 				
 				// submit the job only, if it should not be suspended
 				if (!suspend) {
-					try {
-						client.run(program,(OptimizedPlan) optPlan, false);
-					} catch (Throwable t) {
-						LOG.error("Error submitting job to the job-manager.", t);
-						showErrorPage(resp, t.getMessage());
-						return;
-					} finally {
-						program.deleteExtractedLibraries();
+					if (optPlan instanceof OptimizedPlan) {
+						try {
+							client.run(program, (OptimizedPlan) optPlan, false);
+						}
+						catch (Throwable t) {
+							LOG.error("Error submitting job to the job-manager.", t);
+							showErrorPage(resp, t.getMessage());
+							return;
+						}
+						finally {
+							program.deleteExtractedLibraries();
+						}
 					}
-				} else {
+					else {
+						throw new RuntimeException("Not implemented for Streaming Job plans");
+					}
+				}
+				else {
 					try {
 						this.submittedJobs.put(uid, client.getJobGraph(program, optPlan));
 					}
@@ -285,23 +294,27 @@ public class JobSubmissionServlet extends HttpServlet {
 
 				// redirect to the plan display page
 				resp.sendRedirect("showPlan?id=" + uid + "&suspended=" + (suspend ? "true" : "false"));
-			} else {
+			}
+			else {
 				// don't show any plan. directly submit the job and redirect to the
-				// nephele runtime monitor
+				// runtime monitor
 				try {
 					client.run(program, parallelism, false);
-				} catch (Exception ex) {
+				}
+				catch (Exception ex) {
 					LOG.error("Error submitting job to the job-manager.", ex);
 					// HACK: Is necessary because Message contains whole stack trace
 					String errorMessage = ex.getMessage().split("\n")[0];
 					showErrorPage(resp, errorMessage);
 					return;
-				} finally {
+				}
+				finally {
 					program.deleteExtractedLibraries();
 				}
 				resp.sendRedirect(START_PAGE_URL);
 			}
-		} else if (action.equals(ACTION_RUN_SUBMITTED_VALUE)) {
+		}
+		else if (action.equals(ACTION_RUN_SUBMITTED_VALUE)) {
 			// --------------- run a job that has been submitted earlier, but was -------------------
 			// --------------- not executed because of a plan display -------------------
 
@@ -328,9 +341,10 @@ public class JobSubmissionServlet extends HttpServlet {
 
 			// submit the job
 			try {
-				Client client = new Client(nepheleConfig, getClass().getClassLoader());
+				Client client = new Client(config, getClass().getClassLoader());
 				client.run(job, false);
-			} catch (Exception ex) {
+			}
+			catch (Exception ex) {
 				LOG.error("Error submitting job to the job-manager.", ex);
 				resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
 				// HACK: Is necessary because Message contains whole stack trace

http://git-wip-us.apache.org/repos/asf/flink/blob/9ca42190/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java
b/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java
deleted file mode 100644
index 381ee33..0000000
--- a/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.client.web;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.net.InetSocketAddress;
-import java.util.Iterator;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs;
-import scala.concurrent.Await;
-import scala.concurrent.duration.FiniteDuration;
-import scala.concurrent.Future;
-
-
-public class JobsInfoServlet extends HttpServlet {
-	/**
-	 * Serial UID for serialization interoperability.
-	 */
-	private static final long serialVersionUID = 558077298726449201L;
-
-	// ------------------------------------------------------------------------
-
-	private final Configuration config;
-
-	private final ActorSystem system;
-
-	private final FiniteDuration timeout;
-
-	private final ActorRef jobmanager;
-	
-	public JobsInfoServlet(Configuration flinkConfig) {
-		this.config = flinkConfig;
-		system = ActorSystem.create("JobsInfoServletActorSystem",
-				AkkaUtils.getDefaultAkkaConfig());
-		this.timeout = AkkaUtils.getTimeout(flinkConfig);
-
-		String jmHost = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
-		int jmPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-				ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
-
-		InetSocketAddress address = new InetSocketAddress(jmHost, jmPort);
-
-		Future<ActorRef> jobManagerFuture = JobManager.getJobManagerRemoteReferenceFuture(address,
system, timeout);
-
-		try {
-			this.jobmanager = Await.result(jobManagerFuture, timeout);
-		} catch (Exception ex) {
-			throw new RuntimeException("Could not find job manager at specified address " +
-					JobManager.getRemoteJobManagerAkkaURL(address) + ".");
-		}
-	}
-
-	@Override
-	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException,
IOException {
-		//resp.setContentType("application/json");
-		
-		try {
-			final Future<Object> response = Patterns.ask(jobmanager,
-					JobManagerMessages.getRequestRunningJobs(),
-					new Timeout(timeout));
-
-			Object result = null;
-
-			try {
-				result = Await.result(response, timeout);
-			} catch (Exception exception) {
-				throw new IOException("Could not retrieve the running jobs from the job manager.",
-						exception);
-			}
-
-			if(!(result instanceof RunningJobs)) {
-				throw new RuntimeException("ReqeustRunningJobs requires a response of type " +
-						"RunningJob. Instead the response is of type " + result.getClass() + ".");
-			} else {
-
-				final Iterator<ExecutionGraph> graphs = ((RunningJobs) result).
-						asJavaIterable().iterator();
-
-				resp.setStatus(HttpServletResponse.SC_OK);
-				PrintWriter wrt = resp.getWriter();
-				wrt.write("[");
-				while(graphs.hasNext()){
-					ExecutionGraph graph = graphs.next();
-					//Serialize job to json
-					wrt.write("{");
-					wrt.write("\"jobid\": \"" + graph.getJobID() + "\",");
-					if(graph.getJobName() != null) {
-						wrt.write("\"jobname\": \"" + graph.getJobName()+"\",");
-					}
-					wrt.write("\"status\": \""+ graph.getState() + "\",");
-					wrt.write("\"time\": " + graph.getStatusTimestamp(graph.getState()));
-					wrt.write("}");
-					//Write seperator between json objects
-					if(graphs.hasNext()) {
-						wrt.write(",");
-					}
-				}
-				wrt.write("]");
-			}
-		} catch (Throwable t) {
-			resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
-			resp.getWriter().print(t.getMessage());
-		}
-	}
-
-	protected String escapeString(String str) {
-		int len = str.length();
-		char[] s = str.toCharArray();
-		StringBuilder sb = new StringBuilder();
-
-		for (int i = 0; i < len; i += 1) {
-			char c = s[i];
-			if ((c == '\\') || (c == '"') || (c == '/')) {
-				sb.append('\\');
-				sb.append(c);
-			} else if (c == '\b') {
-				sb.append("\\b");
-			} else if (c == '\t') {
-				sb.append("\\t");
-			} else if (c == '\n') {
-				sb.append("<br>");
-			} else if (c == '\f') {
-				sb.append("\\f");
-			} else if (c == '\r') {
-				sb.append("\\r");
-			} else {
-				if (c < ' ') {
-					// Unreadable throw away
-				} else {
-					sb.append(c);
-				}
-			}
-		}
-
-		return sb.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9ca42190/flink-clients/src/main/java/org/apache/flink/client/web/PlanDisplayServlet.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/PlanDisplayServlet.java
b/flink-clients/src/main/java/org/apache/flink/client/web/PlanDisplayServlet.java
index 1c2663f..9043a2e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/PlanDisplayServlet.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/PlanDisplayServlet.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.client.web;
 
 import java.io.IOException;
@@ -27,11 +26,11 @@ import java.util.Map;
 
 import javax.servlet.http.HttpServletRequest;
 
+/**
+ * Simple servlet that displays the visualization of a data flow plan.
+ */
 public class PlanDisplayServlet extends GUIServletStub {
-	
-	/**
-	 * Serial UID for serialization interoperability.
-	 */
+
 	private static final long serialVersionUID = 3610115341264927614L;
 	
 	
@@ -83,7 +82,6 @@ public class PlanDisplayServlet extends GUIServletStub {
 				URI request = new URI(req.getRequestURL().toString());
 				URI vizURI = new URI(request.getScheme(), null, request.getHost(), runtimeVisualizationPort,
null, null, null);
 				this.runtimeVisURL = vizURI.toString();
-				System.out.println(this.runtimeVisURL);
 			} catch (URISyntaxException e) {
 				; // ignore and simply do not forward
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/9ca42190/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
b/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
index ad7b6d4..6384c9a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.client.web;
 
 import java.io.File;
@@ -42,7 +41,7 @@ import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 
 /**
- * This class sets up the web-server that serves the web frontend. It instantiates and
+ * This class sets up the web-server that serves the web client. It instantiates and
  * configures an embedded jetty server.
  */
 public class WebInterfaceServer {
@@ -65,25 +64,22 @@ public class WebInterfaceServer {
 	 * It serves the asynchronous requests for the plans and all other static resources, like
 	 * static web pages, stylesheets or javascript files.
 	 * 
-	 * @param nepheleConfig
-	 *        The configuration for the nephele job manager. All compiled jobs will be sent
-	 *        to the manager described by this configuration.
+	 * @param config
+	 *        The configuration for the JobManager. All jobs will be sent
+	 *        to the JobManager described by this configuration.
 	 * @param port
 	 *        The port to launch the server on.
 	 * @throws IOException
 	 *         Thrown, if the server setup failed for an I/O related reason.
 	 */
-	public WebInterfaceServer(Configuration nepheleConfig, int port)
-																	throws IOException {
-		Configuration config = GlobalConfiguration.getConfiguration();
-
+	public WebInterfaceServer(Configuration config, int port) throws IOException {
 		// if no explicit configuration is given, use the global configuration
-		if (nepheleConfig == null) {
-			nepheleConfig = config;
+		if (config == null) {
+			config = GlobalConfiguration.getConfiguration();
 		}
 		
 		// get base path of Flink installation
-		String basePath = nepheleConfig.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY,"");
+		String basePath = config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY,"");
 
 		File tmpDir;
 		File uploadDir;
@@ -100,9 +96,7 @@ public class WebInterfaceServer {
 			ConfigConstants.DEFAULT_WEB_TMP_DIR);
 		
 		tmpDir = new File(tmpDirPath);
-		if(tmpDir.isAbsolute()) {
-			// absolute path, everything all right
-		} else {
+		if (!tmpDir.isAbsolute()) {
 			// path relative to base dir
 			tmpDir = new File(basePath+"/"+tmpDirPath);
 		}
@@ -111,9 +105,7 @@ public class WebInterfaceServer {
 				ConfigConstants.DEFAULT_WEB_JOB_STORAGE_DIR);
 		
 		uploadDir = new File(uploadDirPath);
-		if(uploadDir.isAbsolute()) {
-			// absolute path, everything peachy
-		} else {
+		if (!uploadDir.isAbsolute()) {
 			// path relative to base dir
 			uploadDir = new File(basePath+"/"+uploadDirPath);
 		}
@@ -122,21 +114,19 @@ public class WebInterfaceServer {
 				ConfigConstants.DEFAULT_WEB_PLAN_DUMP_DIR);
 		
 		planDumpDir = new File(planDumpDirPath);
-		if(planDumpDir.isAbsolute()) {
-			// absolute path, nice and dandy
-		} else {
+		if (!planDumpDir.isAbsolute()) {
 			// path relative to base dir
 			planDumpDir = new File(basePath+"/"+planDumpDirPath);
 		}
 		
 		if (LOG.isInfoEnabled()) {
-			LOG.info("Setting up web frontend server, using web-root directory '" +
-					webRootDir.toExternalForm()	+ "'.");
+			LOG.info("Setting up web client server, using web-root directory '" +
+					webRootDir.toExternalForm() + "'.");
 			LOG.info("Web frontend server will store temporary files in '" + tmpDir.getAbsolutePath()
 				+ "', uploaded jobs in '" + uploadDir.getAbsolutePath() + "', plan-json-dumps in '"
 				+ planDumpDir.getAbsolutePath() + "'.");
 	
-			LOG.info("Web-frontend will submit jobs to nephele job-manager on "
+			LOG.info("Web client will submit jobs to JobManager at "
 				+ config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + ", port "
 				+ config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
 				+ ".");
@@ -149,18 +139,16 @@ public class WebInterfaceServer {
 		checkAndCreateDirectories(uploadDir, true);
 		checkAndCreateDirectories(planDumpDir, true);
 		
-		int jobManagerWebPort = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
+		int jobManagerWebPort = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
+												ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
 
 		// ----- the handlers for the servlets -----
 		ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS);
 		servletContext.setContextPath("/");
 		servletContext.addServlet(new ServletHolder(new PactJobJSONServlet(uploadDir)), "/pactPlan");
-		servletContext.addServlet(new ServletHolder(new JobsInfoServlet(nepheleConfig)),
-				"/jobsInfo");
 		servletContext.addServlet(new ServletHolder(new PlanDisplayServlet(jobManagerWebPort)),
"/showPlan");
 		servletContext.addServlet(new ServletHolder(new JobsServlet(uploadDir, tmpDir, "launch.html")),
"/jobs");
-		servletContext.addServlet(new ServletHolder(new JobSubmissionServlet(nepheleConfig, uploadDir,
planDumpDir)),
-			"/runJob");
+		servletContext.addServlet(new ServletHolder(new JobSubmissionServlet(config, uploadDir,
planDumpDir)), "/runJob");
 
 		// ----- the hander serving the written pact plans -----
 		ResourceHandler pactPlanHandler = new ResourceHandler();

http://git-wip-us.apache.org/repos/asf/flink/blob/9ca42190/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
index fcb41cb..7f72370 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
@@ -99,7 +99,7 @@ public class WebInfoServer {
 		URL webRootDir = this.getClass().getClassLoader().getResource(WEB_ROOT_DIR);
 
 		if(webRootDir == null) {
-			throw new FileNotFoundException("Cannot start jobmanager web info server. The " +
+			throw new FileNotFoundException("Cannot start JobManager web info server. The " +
 					"resource " + WEB_ROOT_DIR + " is not included in the jar.");
 		}
 
@@ -110,7 +110,7 @@ public class WebInfoServer {
 		}
 
 		if (LOG.isInfoEnabled()) {
-			LOG.info("Setting up web info server, using web-root directory" +
+			LOG.info("Setting up web info server, using web-root directory " +
 					webRootDir.toExternalForm()	+ ".");
 
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9ca42190/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
index d2147e4..1fb6422 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.util;
 import java.io.InputStream;
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
+import java.util.List;
 import java.util.Properties;
 
 import org.slf4j.Logger;
@@ -176,6 +177,22 @@ public class EnvironmentInformation {
 	}
 
 	/**
+	 * Gets the system parameters and environment parameters that were passed to the JVM on
startup.
+	 *
+	 * @return The options passed to the JVM on startup.
+	 */
+	public static String[] getJvmStartupOptionsArray() {
+		try {
+			RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean();
+			List<String> options = bean.getInputArguments();
+			return options.toArray(new String[options.size()]);
+		}
+		catch (Throwable t) {
+			return new String[0];
+		}
+	}
+
+	/**
 	 * Gets the directory for temporary files, as returned by the JVM system property "java.io.tmpdir".
 	 *
 	 * @return The directory for temporary files.
@@ -199,7 +216,7 @@ public class EnvironmentInformation {
 			String user = getUserRunning();
 			
 			String jvmVersion = getJvmVersion();
-			String options = getJvmStartupOptions();
+			String[] options = getJvmStartupOptionsArray();
 			
 			String javaHome = System.getenv("JAVA_HOME");
 			
@@ -210,7 +227,17 @@ public class EnvironmentInformation {
 					+ "Rev:" + rev.commitId + ", " + "Date:" + rev.commitDate + ")");
 			log.info(" Current user: " + user);
 			log.info(" JVM: " + jvmVersion);
-			log.info(" Startup Options: " + options);
+
+			if (options.length == 0) {
+				log.info(" Startup Options: (none)");
+			}
+			else {
+				log.info(" Startup Options:");
+				for (String s: options) {
+					log.info("    " + s);
+				}
+			}
+
 			log.info(" Maximum heap size: " + maxHeapMegabytes + " MiBytes");
 			log.info(" JAVA_HOME: " + (javaHome == null ? "not set" : javaHome));
 			log.info("--------------------------------------------------------------------------------");

http://git-wip-us.apache.org/repos/asf/flink/blob/9ca42190/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0630115..415a20c 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -645,7 +645,9 @@ object JobManager {
             runJobManager(configuration, executionMode, listeningHost, listeningPort)
           }
         })
-      } else {
+      }
+      else {
+        LOG.info("Security is not enabled. Starting non-authenticated JobManager.")
         runJobManager(configuration, executionMode, listeningHost, listeningPort)
       }
     }
@@ -679,10 +681,15 @@ object JobManager {
     LOG.info("Starting JobManager")
 
     // Bring up the job manager actor system first, bind it to the given address.
-    LOG.debug("Starting JobManager actor system")
+    LOG.info("Starting JobManager actor system at {}:{}", listeningAddress, listeningPort)
 
     val jobManagerSystem = try {
-      AkkaUtils.createActorSystem(configuration, Some((listeningAddress, listeningPort)))
+      val akkaConfig = AkkaUtils.getAkkaConfig(configuration,
+                                               Some((listeningAddress, listeningPort)))
+      if (LOG.isDebugEnabled) {
+        LOG.debug("Using akka configuration\n " + akkaConfig)
+      }
+      AkkaUtils.createActorSystem(akkaConfig)
     }
     catch {
       case t: Throwable => {
@@ -700,11 +707,12 @@ object JobManager {
 
     try {
       // bring up the job manager actor
-      LOG.debug("Starting JobManager actor")
+      LOG.info("Starting JobManager actor")
       val (jobManager, archiver) = startJobManagerActors(configuration, jobManagerSystem)
 
       // start a process reaper that watches the JobManager. If the JobManager actor dies,
       // the process reaper will kill the JVM process (to ensure easy failure detection)
+      LOG.debug("Starting JobManager process reaper")
       jobManagerSystem.actorOf(
         Props(classOf[ProcessReaper], jobManager, LOG, RUNTIME_FAILURE_RETURN_CODE),
         "JobManager_Process_Reaper")
@@ -763,8 +771,8 @@ object JobManager {
 
     parser.parse(args, JobManagerCLIConfiguration()) map {
       config =>
+        LOG.info("Loading configuration from " + config.configDir)
         GlobalConfiguration.loadConfiguration(config.configDir)
-
         val configuration = GlobalConfiguration.getConfiguration
 
         if (config.configDir != null && new File(config.configDir).isDirectory) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9ca42190/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 4c85e5b..7bfa370 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -806,6 +806,7 @@ object TaskManager {
 
     // load the configuration
     try {
+      LOG.info("Loading configuration from " + cliConfig.configDir)
       GlobalConfiguration.loadConfiguration(cliConfig.configDir)
       GlobalConfiguration.getConfiguration()
     }
@@ -906,7 +907,12 @@ object TaskManager {
     LOG.info("Starting TaskManager actor system")
 
     val taskManagerSystem = try {
-      AkkaUtils.createActorSystem(configuration, Some((taskManagerHostname, actorSystemPort)))
+      val akkaConfig = AkkaUtils.getAkkaConfig(configuration,
+                                               Some((taskManagerHostname, actorSystemPort)))
+      if (LOG.isDebugEnabled) {
+        LOG.debug("Using akka configuration\n " + akkaConfig)
+      }
+      AkkaUtils.createActorSystem(akkaConfig)
     }
     catch {
       case t: Throwable => {

http://git-wip-us.apache.org/repos/asf/flink/blob/9ca42190/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java
index d9fc6b9..64a676c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java
@@ -47,6 +47,7 @@ public class EnvironmentInformationTest {
 	public void testEnvironmentMethods() {
 		try {
 			assertNotNull(EnvironmentInformation.getJvmStartupOptions());
+			assertNotNull(EnvironmentInformation.getJvmStartupOptionsArray());
 			assertNotNull(EnvironmentInformation.getJvmVersion());
 			assertNotNull(EnvironmentInformation.getRevisionInformation());
 			assertNotNull(EnvironmentInformation.getVersion());


Mime
View raw message