flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/2] flink git commit: [FLINK-1529] [jobmanager] Improve error handling on JobManager startup
Date Fri, 13 Feb 2015 16:29:01 GMT
Repository: flink
Updated Branches:
  refs/heads/master 6bec22877 -> 0a22b71c8


[FLINK-1529] [jobmanager] Improve error handling on JobManager startup

This closes #385


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1dafd810
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1dafd810
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1dafd810

Branch: refs/heads/master
Commit: 1dafd810b7fc185a85d2e8b7666a2b8ab28d7787
Parents: 6bec228
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Feb 11 19:01:41 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Feb 13 15:32:45 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManager.scala   | 295 ++++++++++++++-----
 .../jobmanager/JobManagerCLIConfiguration.scala |   4 +-
 .../runtime/jobmanager/WithWebServer.scala      |   2 +-
 .../minicluster/LocalFlinkMiniCluster.scala     |   2 +-
 .../jobmanager/JobManagerStartupTest.java       |  90 ++++++
 .../runtime/testingUtils/TestingCluster.scala   |  21 +-
 .../testingUtils/TestingJobManager.scala        |   2 -
 .../runtime/testingUtils/TestingUtils.scala     |  17 +-
 .../test/util/ForkableFlinkMiniCluster.scala    |   6 +-
 .../apache/flink/yarn/ApplicationMaster.scala   |  30 +-
 10 files changed, 369 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1dafd810/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 4636999..d25cc52 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -20,13 +20,12 @@ package org.apache.flink.runtime.jobmanager
 
 import java.io.{IOException, File}
 import java.net.InetSocketAddress
+
 import akka.actor.Status.Failure
-import akka.actor._
-import akka.pattern.ask
-import org.apache.flink.configuration.{Configuration, ConfigConstants, GlobalConfiguration}
+import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Configuration}
 import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.runtime.blob.BlobServer
-import org.apache.flink.runtime.executiongraph.{ExecutionJobVertex, ExecutionGraph}
+import org.apache.flink.runtime.executiongraph.{Execution, ExecutionJobVertex, ExecutionGraph}
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
 import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
 import org.apache.flink.runtime.messages.Messages.Acknowledge
@@ -45,18 +44,24 @@ import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.messages.RegistrationMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages.{NextInputSplit, Heartbeat}
 import org.apache.flink.runtime.profiling.ProfilingUtils
+import org.apache.flink.util.InstantiationUtil
+
 import org.slf4j.LoggerFactory
+
+import akka.actor._
+import akka.pattern.ask
+
 import scala.concurrent.Future
 import scala.concurrent.duration._
 import scala.language.postfixOps
-import org.apache.flink.util.InstantiationUtil
+import scala.collection.JavaConverters._
 
 /**
  * The job manager is responsible for receiving Flink jobs, scheduling the tasks, gathering
the
  * job status and managing the task managers. It is realized as an actor and receives amongst
others
  * the following messages:
  *
- *  - [[RegisterTaskManager]] is sent by a TaskManager which wants to registe at the job
manager.
+ *  - [[RegisterTaskManager]] is sent by a TaskManager which wants to register at the job
manager.
  *  A successful registration at the instance manager is acknowledged by [[AcknowledgeRegistration]]
  *
  *  - [[SubmitJob]] is sent by a client which wants to submit a job to the system. The submit
@@ -77,44 +82,22 @@ import org.apache.flink.util.InstantiationUtil
  *
  * - [[JobStatusChanged]] indicates that the status of job (RUNNING, CANCELING, FINISHED,
etc.) has
  * changed. This message is sent by the ExecutionGraph.
- *
- * @param configuration object with user provided configuration values
  */
-class JobManager(val configuration: Configuration) extends 
-Actor with ActorLogMessages with ActorLogging {
-  import context._
-  import scala.collection.JavaConverters._
+class JobManager(val configuration: Configuration,
+                 val instanceManager: InstanceManager,
+                 val scheduler: FlinkScheduler,
+                 val libraryCacheManager: BlobLibraryCacheManager,
+                 val archive: ActorRef,
+                 val accumulatorManager: AccumulatorManager,
+                 val profiler: Option[ActorRef],
+                 val defaultExecutionRetries: Int,
+                 val delayBetweenRetries: Long,
+                 implicit val timeout: FiniteDuration)
+  extends Actor with ActorLogMessages with ActorLogging {
 
-  implicit val timeout = AkkaUtils.getTimeout(configuration)
-
-  log.info(s"Starting job manager at ${self.path}.")
-
-  checkJavaVersion
-
-  val (archiveCount,
-    profiling,
-    cleanupInterval,
-    defaultExecutionRetries,
-    delayBetweenRetries) = JobManager.parseConfiguration(configuration)
-
-  // Props for the profiler actor
-  def profilerProps: Props = Props(classOf[JobManagerProfiler])
-
-  // Props for the archive actor
-  def archiveProps: Props = Props(classOf[MemoryArchivist], archiveCount)
-
-  val profiler = profiling match {
-    case true => Some(context.actorOf(profilerProps, JobManager.PROFILER_NAME))
-    case false => None
-  }
-
-  val archive = context.actorOf(archiveProps, JobManager.ARCHIVE_NAME)
+  import context._
 
-  val accumulatorManager = new AccumulatorManager(Math.min(1, archiveCount))
-  val instanceManager = new InstanceManager()
-  val scheduler = new FlinkScheduler()
-  val libraryCacheManager = new BlobLibraryCacheManager(
-                                        new BlobServer(configuration), cleanupInterval)
+  val LOG = JobManager.LOG
 
   // List of current jobs running
   val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
@@ -122,13 +105,17 @@ Actor with ActorLogMessages with ActorLogging {
   // Map of actors which want to be notified once a specific job terminates
   val finalJobStatusListener = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
 
-  instanceManager.addInstanceListener(scheduler)
 
-  log.info("Started job manager. Waiting for incoming messages.")
+  override def preStart(): Unit = {
+    LOG.info(s"Starting JobManager at ${self.path}.")
+  }
 
   override def postStop(): Unit = {
     log.info(s"Stopping job manager ${self.path}.")
 
+    archive ! PoisonPill
+    profiler.map( ref => ref ! PoisonPill )
+
     for((e,_) <- currentJobs.values){
       e.fail(new Exception("The JobManager is shutting down."))
     }
@@ -304,7 +291,7 @@ Actor with ActorLogMessages with ActorLogging {
           executionGraph.scheduleOrUpdateConsumers(executionId, partitionIndex)
         case None =>
           log.error("Cannot find execution graph for job ID {} to schedule or update consumers",
-            jobId);
+            jobId)
           sender ! Failure(new IllegalStateException("Cannot find execution graph for job
ID " +
             jobId + " to schedule or update consumers."))
       }
@@ -530,54 +517,114 @@ Actor with ActorLogMessages with ActorLogging {
         log.error(t, "Could not properly unregister job {} form the library cache.", jobID)
     }
   }
-
-  private def checkJavaVersion(): Unit = {
-    if (System.getProperty("java.version").substring(0, 3).toDouble < 1.7) {
-      log.warning("Warning: Flink is running with Java 6. " +
-        "Java 6 is not maintained any more by Oracle or the OpenJDK community. " +
-        "Flink currently supports Java 6, but may not in future releases," +
-        " due to the unavailability of bug fixes security patched.")
-    }
-  }
 }
 
 object JobManager {
+  
   import ExecutionMode._
+
   val LOG = LoggerFactory.getLogger(classOf[JobManager])
+
   val FAILURE_RETURN_CODE = 1
+
   val JOB_MANAGER_NAME = "jobmanager"
   val EVENT_COLLECTOR_NAME = "eventcollector"
   val ARCHIVE_NAME = "archive"
   val PROFILER_NAME = "profiler"
 
   def main(args: Array[String]): Unit = {
+
+    // startup checks and logging
     EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager")
-    val (configuration, executionMode, listeningAddress) = parseArgs(args)
+    checkJavaVersion()
+
+    val (configuration: Configuration,
+         executionMode: ExecutionMode,
+         listeningAddress:  Option[(String, Int)]) =
+    try {
+      parseArgs(args)
+    }
+    catch {
+      case t: Throwable => {
+        LOG.error(t.getMessage(), t)
+        System.exit(FAILURE_RETURN_CODE)
+        null
+      }
+    }
 
-      if(SecurityUtils.isSecurityEnabled) {
+    try {
+      if (SecurityUtils.isSecurityEnabled) {
         LOG.info("Security is enabled. Starting secure JobManager.")
         SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
           override def run(): Unit = {
-            start(configuration, executionMode, listeningAddress)
+            runJobManager(configuration, executionMode, listeningAddress)
           }
         })
       } else {
-        start(configuration, executionMode, listeningAddress)
+        runJobManager(configuration, executionMode, listeningAddress)
       }
+    }
+    catch {
+      case t: Throwable => {
+        LOG.error("Failed to start JobManager.", t)
+        System.exit(FAILURE_RETURN_CODE)
+      }
+    }
   }
 
-  def start(configuration: Configuration, executionMode: ExecutionMode,
-            listeningAddress : Option[(String, Int)]): Unit = {
-    val jobManagerSystem = AkkaUtils.createActorSystem(configuration, listeningAddress)
 
-    startActor(Props(new JobManager(configuration) with WithWebServer))(jobManagerSystem)
+  def runJobManager(configuration: Configuration,
+                    executionMode: ExecutionMode,
+                    listeningAddress: Option[(String, Int)]) : Unit = {
 
-    if(executionMode.equals(LOCAL)){
-      TaskManager.startActorWithConfiguration("", configuration,
-        localAkkaCommunication = false, localTaskManagerCommunication = true)(jobManagerSystem)
+    LOG.info("Starting JobManager")
+    LOG.debug("Starting JobManager actor system")
+
+    val jobManagerSystem = try {
+      AkkaUtils.createActorSystem(configuration, listeningAddress)
+    }
+    catch {
+      case t: Throwable => {
+        if (t.isInstanceOf[org.jboss.netty.channel.ChannelException]) {
+          val cause = t.getCause()
+          if (cause != null && t.getCause().isInstanceOf[java.net.BindException])
{
+            val address = listeningAddress match {
+              case Some((host, port)) => host + ":" + port
+              case None => "unknown"
+            }
+
+            throw new Exception("Unable to create JobManager at address " + address + ":
"
+              + cause.getMessage(), t)
+          }
+        }
+        throw new Exception("Could not create JobManager actor system", t)
+      }
     }
 
-    jobManagerSystem.awaitTermination()
+    try {
+      LOG.debug("Starting JobManager actor")
+
+      startActor(configuration, jobManagerSystem, true)
+
+      if(executionMode.equals(LOCAL)){
+        LOG.info("Starting embedded TaskManager for JobManager's LOCAL mode execution")
+
+        TaskManager.startActorWithConfiguration("", configuration,
+          localAkkaCommunication = false, localTaskManagerCommunication = true)(jobManagerSystem)
+      }
+
+      jobManagerSystem.awaitTermination()
+    }
+    catch {
+      case t: Throwable => {
+        try {
+          jobManagerSystem.shutdown()
+        } catch {
+          case tt: Throwable => LOG.warn("Could not cleanly shut down actor system", tt)
+        }
+        throw t
+      }
+    }
   }
 
   /**
@@ -622,8 +669,7 @@ object JobManager {
 
         (configuration, config.executionMode, listeningAddress)
     } getOrElse {
-      LOG.error("CLI Parsing failed. Usage: " + parser.usage)
-      sys.exit(FAILURE_RETURN_CODE)
+      throw new Exception("Wrong arguments. Usage: " + parser.usage)
     }
   }
 
@@ -637,14 +683,15 @@ object JobManager {
   def parseConfiguration(configuration: Configuration): (Int, Boolean, Long, Int, Long) =
{
     val archiveCount = configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT,
       ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT)
-    val profilingEnabled = configuration.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true)
+    val profilingEnabled = configuration.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, false)
 
-    val cleanupInterval = configuration.getLong(ConfigConstants
-      .LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
+    val cleanupInterval = configuration.getLong(
+      ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
       ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000
 
-    val executionRetries = configuration.getInteger(ConfigConstants
-      .DEFAULT_EXECUTION_RETRIES_KEY, ConfigConstants.DEFAULT_EXECUTION_RETRIES)
+    val executionRetries = configuration.getInteger(
+      ConfigConstants.DEFAULT_EXECUTION_RETRIES_KEY,
+      ConfigConstants.DEFAULT_EXECUTION_RETRIES)
 
     val delayBetweenRetries = 2 * configuration.getLong(
       ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY,
@@ -653,11 +700,95 @@ object JobManager {
     (archiveCount, profilingEnabled, cleanupInterval, executionRetries, delayBetweenRetries)
   }
 
-  def startActor(configuration: Configuration)(implicit actorSystem: ActorSystem): ActorRef
= {
-    startActor(Props(classOf[JobManager], configuration))
+  /**
+   * Create the job manager members as (instanceManager, scheduler, libraryCacheManager,
+   *              archiverProps, accumulatorManager, profiler, defaultExecutionRetries,
+   *              delayBetweenRetries, timeout)
+   *
+   * @param configuration The configuration from which to parse the config values.
+   * @return The members for a default JobManager.
+   */
+  def createJobManagerComponents(configuration: Configuration) :
+    (InstanceManager, FlinkScheduler, BlobLibraryCacheManager,
+      Props, AccumulatorManager, Option[Props], Int, Long, FiniteDuration, Int) = {
+
+    val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
+
+    val (archiveCount, profilingEnabled, cleanupInterval, executionRetries, delayBetweenRetries)
=
+      parseConfiguration(configuration)
+
+    val archiveProps: Props = Props(classOf[MemoryArchivist], archiveCount)
+
+    val profilerProps: Option[Props] = if (profilingEnabled) {
+      Some(Props(classOf[JobManagerProfiler]))
+    } else {
+      None
+    }
+
+    val accumulatorManager: AccumulatorManager = new AccumulatorManager(Math.min(1, archiveCount))
+
+    var blobServer: BlobServer = null
+    var instanceManager: InstanceManager = null
+    var scheduler: FlinkScheduler = null
+    var libraryCacheManager: BlobLibraryCacheManager = null
+
+    try {
+      blobServer = new BlobServer(configuration)
+      instanceManager = new InstanceManager()
+      scheduler = new FlinkScheduler()
+      libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval)
+
+      instanceManager.addInstanceListener(scheduler)
+    }
+    catch {
+      case t: Throwable => {
+        if (libraryCacheManager != null) {
+          libraryCacheManager.shutdown()
+        }
+        if (scheduler != null) {
+          scheduler.shutdown()
+        }
+        if (instanceManager != null) {
+          instanceManager.shutdown()
+        }
+        if (blobServer != null) {
+          blobServer.shutdown()
+        }
+        throw t
+      }
+    }
+
+    (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager,
+      profilerProps, executionRetries, delayBetweenRetries, timeout, archiveCount)
+  }
+
+  def startActor(configuration: Configuration,
+                 actorSystem: ActorSystem,
+                 withWebServer: Boolean): ActorRef = {
+
+    val (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager,
+      profilerProps, executionRetries, delayBetweenRetries,
+      timeout, _) = createJobManagerComponents(configuration)
+
+    val profiler: Option[ActorRef] =
+                 profilerProps.map( props => actorSystem.actorOf(props, PROFILER_NAME)
)
+
+    val archiver: ActorRef = actorSystem.actorOf(archiveProps, JobManager.ARCHIVE_NAME)
+
+    val jobManagerProps = if (withWebServer) {
+      Props(new JobManager(configuration, instanceManager, scheduler,
+        libraryCacheManager, archiver, accumulatorManager, profiler, executionRetries,
+        delayBetweenRetries, timeout) with WithWebServer)
+    } else {
+      Props(classOf[JobManager], configuration, instanceManager, scheduler,
+        libraryCacheManager, archiver, accumulatorManager, profiler, executionRetries,
+        delayBetweenRetries, timeout)
+    }
+
+    startActor(jobManagerProps, actorSystem)
   }
 
-  def startActor(props: Props)(implicit actorSystem: ActorSystem): ActorRef = {
+  def startActor(props: Props, actorSystem: ActorSystem): ActorRef = {
     actorSystem.actorOf(props, JOB_MANAGER_NAME)
   }
 
@@ -688,4 +819,18 @@ object JobManager {
   FiniteDuration): ActorRef = {
     AkkaUtils.getReference(getRemoteAkkaURL(address.getHostName + ":" + address.getPort))
   }
+
+  private def checkJavaVersion(): Unit = {
+    if (System.getProperty("java.version").substring(0, 3).toDouble < 1.7) {
+      LOG.warn("Flink has been started with Java 6. " +
+        "Java 6 is not maintained any more by Oracle or the OpenJDK community. " +
+        "Flink may drop support for Java 6 in future releases, due to the " +
+        "unavailability of bug fixes security patches.")
+    }
+  }
+
+  // --------------------------------------------------------------------------
+
+  class ParseException(message: String) extends Exception(message) {}
+  
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1dafd810/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala
index d08aecc..a932977 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala
@@ -24,5 +24,5 @@ object ExecutionMode extends Enumeration{
   val CLUSTER = Value
 }
 
-case class JobManagerCLIConfiguration(configDir: String = null, executionMode: ExecutionMode
-.ExecutionMode = ExecutionMode.CLUSTER) {}
+case class JobManagerCLIConfiguration(configDir: String = null, 
+          executionMode: ExecutionMode.ExecutionMode = ExecutionMode.CLUSTER) {}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dafd810/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala
index 62e56fe..bc83b9f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 trait WithWebServer extends Actor {
   that: JobManager =>
 
-  val webServer = new WebInfoServer(configuration,self, archive, timeout)
+  val webServer = new WebInfoServer(configuration, self, archive, timeout)
   webServer.start()
 
   abstract override def postStop(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/1dafd810/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 06d611a..d8d5b23 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -67,7 +67,7 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
   override def startJobManager(implicit system: ActorSystem):
   ActorRef = {
     val config = configuration.clone()
-    JobManager.startActor(config)
+    JobManager.startActor(config, system, false)
   }
 
   override def startTaskManager(index: Int)(implicit system: ActorSystem): ActorRef = {

http://git-wip-us.apache.org/repos/asf/flink/blob/1dafd810/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
new file mode 100644
index 0000000..3ad4238
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.net.NetUtils;
+import org.junit.Test;
+
+import scala.Some;
+import scala.Tuple2;
+
+public class JobManagerStartupTest {
+
+	@Test
+	public void testStartupWithPortInUse() {
+		
+		ServerSocket portOccupier = null;
+		final int portNum;
+		
+		try {
+			portNum = NetUtils.getAvailablePort();
+			portOccupier = new ServerSocket(portNum);
+		}
+		catch (Throwable t) {
+			// could not find free port, or open a connection there
+			return;
+		}
+		
+		try {
+			Tuple2<String, Object> connection = new Tuple2<String, Object>("localhost",
portNum);
+			JobManager.runJobManager(new Configuration(), ExecutionMode.CLUSTER(), new Some<Tuple2<String,
Object>>(connection));
+			fail("this should throw an exception");
+		}
+		catch (Exception e) {
+			// expected
+			assertTrue(e.getMessage().contains("Address already in use"));
+		}
+		finally {
+			try {
+				portOccupier.close();
+			}
+			catch (Throwable t) {}
+		}
+	}
+
+	@Test
+	public void testJobManagerStartupFails() {
+		final int portNum;
+		try {
+			portNum = NetUtils.getAvailablePort();
+		}
+		catch (Throwable t) {
+			// skip test if we cannot find a free port
+			return;
+		}
+		Tuple2<String, Object> connection = new Tuple2<String, Object>("localhost",
portNum);
+		Configuration failConfig = new Configuration();
+		failConfig.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, "/does-not-exist-no-sir");
+
+		try {
+			JobManager.runJobManager(failConfig, ExecutionMode.CLUSTER(), new Some<Tuple2<String,
Object>>(connection));
+			fail("this should fail with an exception");
+		}
+		catch (Exception e) {
+			// expected
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dafd810/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 4b6d2a3..6ae943d 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -18,9 +18,9 @@
 
 package org.apache.flink.runtime.testingUtils
 
-import akka.actor.{ActorSystem, Props}
+import akka.actor.{ActorRef, Props, ActorSystem}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
 import org.apache.flink.runtime.minicluster.FlinkMiniCluster
 import org.apache.flink.runtime.net.NetUtils
 import org.apache.flink.runtime.taskmanager.TaskManager
@@ -46,9 +46,20 @@ FlinkMiniCluster(userConfiguration, singleActorSystem) {
     cfg
   }
 
-  override def startJobManager(implicit system: ActorSystem) = {
-    system.actorOf(Props(new JobManager(configuration) with TestingJobManager),
-      JobManager.JOB_MANAGER_NAME)
+  override def startJobManager(implicit actorSystem: ActorSystem): ActorRef = {
+
+    val (instanceManager, scheduler, libraryCacheManager, _, accumulatorManager, _ ,
+        executionRetries, delayBetweenRetries,
+        timeout, archiveCount) = JobManager.createJobManagerComponents(configuration)
+
+    val testArchiveProps = Props(new MemoryArchivist(archiveCount) with TestingMemoryArchivist)
+    val archive = actorSystem.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME)
+
+    val jobManagerProps = Props(new JobManager(configuration, instanceManager, scheduler,
+      libraryCacheManager, archive, accumulatorManager, None, executionRetries,
+      delayBetweenRetries, timeout) with TestingJobManager)
+
+    actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
   }
 
   override def startTaskManager(index: Int)(implicit system: ActorSystem) = {

http://git-wip-us.apache.org/repos/asf/flink/blob/1dafd810/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 0d7ce60..15c55ea 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -52,8 +52,6 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala {
   val waitForJobStatus = scala.collection.mutable.HashMap[JobID,
     collection.mutable.HashMap[JobStatus, Set[ActorRef]]]()
 
-  override def archiveProps = Props(new MemoryArchivist(archiveCount) with TestingMemoryArchivist)
-
   abstract override def receiveWithLogMessages: Receive = {
     receiveTestingMessages orElse super.receiveWithLogMessages
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/1dafd810/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 6f70cb2..2db1d2b 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -18,13 +18,13 @@
 
 package org.apache.flink.runtime.testingUtils
 
-import akka.actor.{ActorRef, Props, ActorSystem}
+import akka.actor.{Props, ActorRef, ActorSystem}
 import akka.testkit.CallingThreadDispatcher
 import com.typesafe.config.ConfigFactory
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ActionQueue
-import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
 import org.apache.flink.runtime.taskmanager.TaskManager
 import scala.concurrent.duration._
 
@@ -67,7 +67,18 @@ object TestingUtils {
   def startTestingJobManager(implicit system: ActorSystem): ActorRef = {
     val config = new Configuration()
 
-    system.actorOf(Props(new JobManager(config) with TestingJobManager))
+    val (instanceManager, scheduler, libraryCacheManager, _, accumulatorManager, _ ,
+        executionRetries, delayBetweenRetries,
+        timeout, archiveCount) = JobManager.createJobManagerComponents(config)
+
+    val testArchiveProps = Props(new MemoryArchivist(archiveCount) with TestingMemoryArchivist)
+    val archive = system.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME)
+
+    val jobManagerProps = Props(new JobManager(config, instanceManager, scheduler,
+      libraryCacheManager, archive, accumulatorManager, None, executionRetries,
+      delayBetweenRetries, timeout) with TestingJobManager)
+
+    system.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
   }
 
   def startTestingTaskManager(jobManager: ActorRef)(implicit system: ActorSystem): ActorRef
= {

http://git-wip-us.apache.org/repos/asf/flink/blob/1dafd810/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 88ce698..23975e2 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -69,11 +69,7 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSyst
       config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + index)
     }
 
-    val localExecution = if(numTaskManagers == 1){
-      true
-    }else{
-      false
-    }
+    val localExecution = numTaskManagers == 1
 
     val (connectionInfo, jobManagerAkkaURL, taskManagerConfig, networkConnectionConfig) =
       TaskManager.parseConfiguration(HOSTNAME, config, singleActorSystem, localExecution)

http://git-wip-us.apache.org/repos/asf/flink/blob/1dafd810/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index cf7adaf..62b7caf 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -167,8 +167,8 @@ object ApplicationMaster {
   }
 
   def startJobManager(currDir: String, hostname: String, dynamicPropertiesEncodedString:
String,
-                       jobManagerWebPort: Int, logDirs: String):
-    (ActorSystem, ActorRef) = {
+                       jobManagerWebPort: Int, logDirs: String): (ActorSystem, ActorRef)
= {
+
     LOG.info("Start job manager for yarn")
     val args = Array[String]("--configDir", currDir)
 
@@ -185,10 +185,28 @@ object ApplicationMaster {
     configuration.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDirs)
 
     // set port to 0 to let Akka automatically determine the port.
-   implicit val jobManagerSystem = AkkaUtils.createActorSystem(configuration, Some((hostname,
0)))
+    val jobManagerSystem = AkkaUtils.createActorSystem(configuration, Some((hostname, 0)))
+
+    LOG.info("Start job manager actor");
+
+    // start all the components inside the job manager
+    val (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager,
+                   profilerProps, executionRetries, delayBetweenRetries,
+                   timeout, _) = JobManager.createJobManagerComponents(configuration)
+
+    // start the profiler, if needed
+    val profiler: Option[ActorRef] =
+      profilerProps.map( props => jobManagerSystem.actorOf(props, JobManager.PROFILER_NAME)
)
+
+    // start the archiver
+    val archiver: ActorRef = jobManagerSystem.actorOf(archiveProps, JobManager.ARCHIVE_NAME)
+
+    val jobManagerProps = Props(new JobManager(configuration, instanceManager, scheduler,
+      libraryCacheManager, archiver, accumulatorManager, profiler, executionRetries,
+      delayBetweenRetries, timeout) with WithWebServer with YarnJobManager)
+
+    val jobManager = JobManager.startActor(jobManagerProps, jobManagerSystem)
 
-    LOG.info("Start job manager actor.")
-    (jobManagerSystem, JobManager.startActor(Props(new JobManager(configuration) with
-      WithWebServer with YarnJobManager)))
+    (jobManagerSystem, jobManager)
   }
 }


Mime
View raw message