flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/2] flink git commit: [FLINK-1631] [jobmanager] Deactivate web frontend in parallel tests (prevent port collisions)
Date Tue, 03 Mar 2015 22:00:44 GMT
Repository: flink
Updated Branches:
  refs/heads/master d50e8bff6 -> 94a66d570


[FLINK-1631] [jobmanager] Deactivate web frontend in parallel tests (prevent port collisions)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/94a66d57
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/94a66d57
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/94a66d57

Branch: refs/heads/master
Commit: 94a66d570e4bb40824813911a4f1bb47a8bf8b90
Parents: a1162b7
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue Mar 3 10:52:56 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Mar 3 21:53:31 2015 +0100

----------------------------------------------------------------------
 docs/config.md                                  |  2 +-
 .../flink/configuration/ConfigConstants.java    |  2 +-
 flink-dist/src/main/resources/flink-conf.yaml   |  8 ++++++-
 .../runtime/jobmanager/web/WebInfoServer.java   | 22 +++++++++++---------
 .../flink/runtime/jobmanager/JobManager.scala   |  8 ++++---
 .../JobManagerProcessReapingTest.java           |  7 ++++++-
 .../runtime/testingUtils/TestingCluster.scala   |  1 +
 .../test/util/ForkableFlinkMiniCluster.scala    |  2 ++
 .../apache/flink/yarn/ApplicationMaster.scala   | 12 ++++++-----
 9 files changed, 42 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/94a66d57/docs/config.md
----------------------------------------------------------------------
diff --git a/docs/config.md b/docs/config.md
index edaefdf..b8cf06a 100644
--- a/docs/config.md
+++ b/docs/config.md
@@ -247,7 +247,7 @@ TaskManager hearbeat may be missing before the TaskManager is considered
failed.
 
 - `jobmanager.web.port`: Port of the JobManager's web interface that displays
 status of running jobs and execution time breakdowns of finished jobs
-(DEFAULT: 8081).
+(DEFAULT: 8081). Setting this value to `-1` disables the web frontend.
 - `jobmanager.web.history`: The number of latest jobs that the JobManager's web
 front-end in its history (DEFAULT: 5).
 

http://git-wip-us.apache.org/repos/asf/flink/blob/94a66d57/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 5507a52..0f42a17 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -234,7 +234,7 @@ public final class ConfigConstants {
 	// ------------------------- JobManager Web Frontend ----------------------
 	
 	/**
-	 * The port for the pact web-frontend server.
+	 * The port for the runtime monitor web-frontend server.
 	 */
 	public static final String JOB_MANAGER_WEB_PORT_KEY = "jobmanager.web.port";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/94a66d57/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index 65a87b9..894137f 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -29,7 +29,7 @@ jobmanager.heap.mb: 256
 
 taskmanager.heap.mb: 512
 
-taskmanager.numberOfTaskSlots: -1
+taskmanager.numberOfTaskSlots: 1
 
 parallelization.degree.default: 1
 
@@ -37,8 +37,14 @@ parallelization.degree.default: 1
 # Web Frontend
 #==============================================================================
 
+# The port under which the web-based runtime monitor listens.
+# A value of -1 deactivates the web server.
+
 jobmanager.web.port: 8081
 
+# The port uder which the standalone web client
+# (for job upload and submit) listens.
+
 webclient.port: 8080
 
 #==============================================================================

http://git-wip-us.apache.org/repos/asf/flink/blob/94a66d57/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
index 7f72370..287a273 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
@@ -42,7 +42,6 @@ import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import scala.concurrent.duration.FiniteDuration;
 
-
 /**
  * This class sets up a web-server that contains a web frontend to display information about
running jobs.
  * It instantiates and configures an embedded jetty server.
@@ -67,14 +66,16 @@ public class WebInfoServer {
 	/**
 	 * Port for info server
 	 */
-	private int port;
+	private final int port;
 
 	/**
 	 * Creates a new web info server. The server runs the servlets that implement the logic
 	 * to list all present information concerning the job manager
 	 *
-	 * @param config
-	 *        The configuration for the flink job manager.
+	 * @param config The Flink configuration.
+	 * @param jobmanager The ActorRef to the JobManager actor
+	 * @param archive The ActorRef to the archive for old jobs
+	 *
 	 * @throws IOException
 	 *         Thrown, if the server setup failed for an I/O related reason.
 	 */
@@ -86,10 +87,13 @@ public class WebInfoServer {
 			throw new NullPointerException();
 		}
 
-		final FiniteDuration timeout = AkkaUtils.getTimeout(config);
-		
 		this.port = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
 				ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
+		if (this.port <= 0) {
+			throw new IllegalArgumentException("Invalid port for the webserver: " + this.port);
+		}
+
+		final FiniteDuration timeout = AkkaUtils.getTimeout(config);
 
 		// get base path of Flink installation
 		final String basePath = config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, "");
@@ -117,7 +121,6 @@ public class WebInfoServer {
 
 		server = new Server(port);
 
-
 		// ----- the handlers for the servlets -----
 		ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS);
 		servletContext.setContextPath("/");
@@ -197,8 +200,7 @@ public class WebInfoServer {
 		server.stop();
 	}
 
-	public Server getServer() {
-		return server;
+	public int getServerPort() {
+		return this.port;
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/94a66d57/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 7238e3d..b702fdc 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -726,9 +726,11 @@ object JobManager {
       }
 
       // start the job manager web frontend
-      LOG.info("Starting JobManger web frontend")
-      val webServer = new WebInfoServer(configuration, jobManager, archiver)
-      webServer.start()
+      if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) != -1) {
+        LOG.info("Starting JobManger web frontend")
+        val webServer = new WebInfoServer(configuration, jobManager, archiver)
+        webServer.start()
+      }
     }
     catch {
       case t: Throwable => {

http://git-wip-us.apache.org/repos/asf/flink/blob/94a66d57/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
index 1bb22d8..e1f9b4a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
@@ -26,6 +26,7 @@ import static org.apache.flink.runtime.testutils.CommonTestUtils.isProcessAlive;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
@@ -178,7 +179,11 @@ public class JobManagerProcessReapingTest {
 		public static void main(String[] args) {
 			try {
 				int port = Integer.parseInt(args[0]);
-				JobManager.runJobManager(new Configuration(), ExecutionMode.CLUSTER(), "localhost", port);
+
+				Configuration config = new Configuration();
+				config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1);
+
+				JobManager.runJobManager(config, ExecutionMode.CLUSTER(), "localhost", port);
 				System.exit(0);
 			}
 			catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/94a66d57/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 9e53bcb..f12bc24 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -41,6 +41,7 @@ class TestingCluster(userConfiguration: Configuration, singleActorSystem:
Boolea
     cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost")
     cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, NetUtils.getAvailablePort())
     cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10)
+    cfg.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1)
 
     cfg.addAll(userConfig)
     cfg

http://git-wip-us.apache.org/repos/asf/flink/blob/94a66d57/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 1b088fc..3b7932e 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -65,6 +65,8 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSyst
       config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, taskManagerData)
     }
 
+    config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1)
+
     super.generateConfiguration(config)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/94a66d57/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index a0cc29f..4bc6014 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -93,12 +93,14 @@ object ApplicationMaster {
           val jobManagerPort = extActor.provider.getDefaultAddress.port.get
 
           // start the web info server
-          LOG.info("Starting Job Manger web frontend.")
-          config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDirs)
-          webserver = new WebInfoServer(config, jobManager, archiver)
-          webserver.start()
+          if (config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) != -1) {
+            LOG.info("Starting Job Manger web frontend.")
+            config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDirs)
+            webserver = new WebInfoServer(config, jobManager, archiver)
+            webserver.start()
+          }
 
-          val jobManagerWebPort = webserver.getServer.getConnectors()(0).getLocalPort
+          val jobManagerWebPort = if (webserver == null) -1 else webserver.getServerPort
 
           // generate configuration file for TaskManagers
           generateConfigurationFile(s"$currDir/$MODIFIED_CONF_FILE", currDir, ownHostname,


Mime
View raw message