flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/2] flink git commit: [FLINK-1352] [runtime] Fix buggy registration of TaskManager to JobManager by introducing dedicated RefusedRegistration messages
Date Tue, 27 Jan 2015 13:49:34 GMT
Repository: flink
Updated Branches:
  refs/heads/master a5150a90c -> 517289dc5


[FLINK-1352] [runtime] Fix buggy registration of TaskManager to JobManager by introducing
dedicated RefusedRegistration messages

Adds exponential backoff strategy for TaskManager registration. Introduces AlreadyRegistered
and RefuseRegistration messages.

This closes #328.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/730e056a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/730e056a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/730e056a

Branch: refs/heads/master
Commit: 730e056a2a2ea028495637b633396392c31337e3
Parents: a5150a9
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Wed Jan 21 12:45:12 2015 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Jan 27 14:33:27 2015 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  10 ++
 .../flink/runtime/instance/InstanceManager.java |   4 +
 .../flink/runtime/jobmanager/JobManager.scala   |  14 +-
 .../runtime/messages/RegistrationMessages.scala |  16 +++
 .../flink/runtime/taskmanager/TaskManager.scala | 139 +++++++++++++------
 .../taskmanager/TaskManagerConfiguration.scala  |   4 +-
 .../TaskManagerRegistrationITCase.scala         | 107 ++++++++++++++
 .../runtime/testingUtils/TestingUtils.scala     |  20 ++-
 .../apache/flink/yarn/ApplicationMaster.scala   |   4 +
 9 files changed, 266 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/730e056a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 969329e..a2f2c83 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -170,6 +170,11 @@ public final class ConfigConstants {
 	public static final String TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS = "taskmanager.debug.memory.logIntervalMs";
 
 	/**
+	 *
+	 */
+	public static final String TASK_MANAGER_MAX_REGISTRATION_DURATION = "taskmanager.maxRegistrationDuration";
+
+	/**
 	 * Parameter for the maximum fan for out-of-core algorithms.
 	 * Corresponds to the maximum fan-in for merge-sorts and the maximum fan-out
 	 * for hybrid hash joins. 
@@ -488,6 +493,11 @@ public final class ConfigConstants {
 	 * The default number of task slots per task manager
 	 */
 	public static final int DEFAULT_TASK_MANAGER_NUM_TASK_SLOTS = -1;
+
+	/**
+	 * The default task manager's maximum registration duration
+	 */
+	public static final String DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION = "Inf";
 	
 	/**
 	 * The default value for the JobClient's polling interval. 2 Seconds.

http://git-wip-us.apache.org/repos/asf/flink/blob/730e056a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index 3ce3ac7..64a761b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -226,6 +226,10 @@ public class InstanceManager {
 			return new HashSet<Instance>(registeredHostsById.values());
 		}
 	}
+
+	public Instance getRegisteredInstance(ActorRef ref) {
+		return registeredHostsByConnection.get(ref);
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/730e056a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 87c9745..c4bb793 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -122,10 +122,16 @@ class JobManager(val configuration: Configuration)
       val instanceID = instanceManager.registerTaskManager(taskManager, connectionInfo,
         hardwareInformation, numberOfSlots)
 
-      // to be notified when the taskManager is no longer reachable
-      context.watch(taskManager)
-
-      taskManager ! AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort)
+      // TaskManager is already registered
+      if(instanceID == null){
+        val instanceID = instanceManager.getRegisteredInstance(taskManager).getId
+        taskManager ! AlreadyRegistered(instanceID, libraryCacheManager.getBlobServerPort)
+      } else {
+        // to be notified when the taskManager is no longer reachable
+        context.watch(taskManager)
+
+        taskManager ! AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort)
+      }
     }
 
     case RequestNumberRegisteredTaskManager => {

http://git-wip-us.apache.org/repos/asf/flink/blob/730e056a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
index 3a556dd..8d30741 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
@@ -43,4 +43,20 @@ object RegistrationMessages {
    */
   case class AcknowledgeRegistration(instanceID: InstanceID, blobPort: Int)
 
+  /**
+   * Denotes that the TaskManager has already been registered at the JobManager.
+   *
+   * @param instanceID
+   * @param blobPort
+   */
+  case class AlreadyRegistered(instanceID: InstanceID, blobPort: Int)
+
+  /**
+   * Denotes the unsuccessful registration of a task manager at the job manager. This is
the
+   * response triggered by the [[RegisterTaskManager]] message.
+   *
+   * @param reason Reason why the task manager registration was refused
+   */
+  case class RefuseRegistration(reason: String)
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/730e056a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 0e953c2..17bc0ac 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -46,7 +46,8 @@ import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, JobID}
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager
 import org.apache.flink.runtime.messages.JobManagerMessages.UpdateTaskExecutionState
-import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, RegisterTaskManager}
+import org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered,
+RefuseRegistration, AcknowledgeRegistration, RegisterTaskManager}
 import org.apache.flink.runtime.messages.TaskManagerMessages._
 import org.apache.flink.runtime.messages.TaskManagerProfilerMessages.{MonitorTask, RegisterProfilingListener,
UnmonitorTask}
 import org.apache.flink.runtime.net.NetUtils
@@ -90,11 +91,11 @@ import scala.collection.JavaConverters._
   log.info("Creating {} task slot(s).", numberOfSlots)
   log.info("TaskManager connection information {}.", connectionInfo)
 
-  val REGISTRATION_DELAY = 0 seconds
-  val REGISTRATION_INTERVAL = 10 seconds
-  val MAX_REGISTRATION_ATTEMPTS = 10
   val HEARTBEAT_INTERVAL = 5000 millisecond
 
+  var registrationDelay = 50 milliseconds
+  var registrationDuration = 0 seconds
+
   TaskManager.checkTempDirs(tmpDirPaths)
   val ioManager = new IOManagerAsync(tmpDirPaths)
   val memoryManager = new DefaultMemoryManager(memorySize, numberOfSlots, pageSize)
@@ -121,7 +122,6 @@ import scala.collection.JavaConverters._
 
   var libraryCacheManager: LibraryCacheManager = null
   var networkEnvironment: Option[NetworkEnvironment] = None
-  var registrationScheduler: Option[Cancellable] = None
   var registrationAttempts: Int = 0
   var registered: Boolean = false
   var currentJobManager = ActorRef.noSender
@@ -175,62 +175,79 @@ import scala.collection.JavaConverters._
   }
 
   private def tryJobManagerRegistration(): Unit = {
-    registrationAttempts = 0
-    import context.dispatcher
-    registrationScheduler = Some(context.system.scheduler.schedule(
-      TaskManager.REGISTRATION_DELAY, TaskManager.REGISTRATION_INTERVAL,
-      self, RegisterAtJobManager))
+    registrationDuration = 0 seconds
+
+    registered = false
+
+    context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager)
   }
 
   override def receiveWithLogMessages: Receive = {
     case RegisterAtJobManager => {
-      registrationAttempts += 1
+      if(!registered) {
+        registrationDuration += registrationDelay
+        // double delay for exponential backoff
+        registrationDelay *= 2
 
-      if (registered) {
-        registrationScheduler.foreach(_.cancel())
-      }
-      else if (registrationAttempts <= TaskManager.MAX_REGISTRATION_ATTEMPTS) {
+        if (registrationDuration > maxRegistrationDuration) {
+          log.warning("TaskManager could not register at JobManager {} after {}.", jobManagerAkkaURL,
 
-        log.info("Try to register at master {}. Attempt #{}", jobManagerAkkaURL,
-          registrationAttempts)
-        val jobManager = context.actorSelection(jobManagerAkkaURL)
+            maxRegistrationDuration)
 
-        jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots)
-      }
-      else {
-        log.error("TaskManager could not register at JobManager.");
-        self ! PoisonPill
+          self ! PoisonPill
+        } else if (!registered) {
+          log.info(s"Try to register at master ${jobManagerAkkaURL}. ${registrationAttempts}.
" +
+            s"Attempt")
+          val jobManager = context.actorSelection(jobManagerAkkaURL)
+
+          jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots)
+
+          context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager)
+        }
       }
     }
 
     case AcknowledgeRegistration(id, blobPort) => {
-      if (!registered) {
+      if(!registered) {
+        finishRegistration(id, blobPort)
         registered = true
-        currentJobManager = sender
-        instanceID = id
-
-        context.watch(currentJobManager)
-
-        log.info("TaskManager successfully registered at JobManager {}.",
-          currentJobManager.path.toString)
-
-        setupNetworkEnvironment()
-        setupLibraryCacheManager(blobPort)
+      } else {
+        if (log.isDebugEnabled) {
+          log.debug("The TaskManager {} is already registered at the JobManager {}, but received
" +
+            "another AcknowledgeRegistration message.", self.path, currentJobManager.path)
+        }
+      }
+    }
 
-        heartbeatScheduler = Some(context.system.scheduler.schedule(
-          TaskManager.HEARTBEAT_INTERVAL, TaskManager.HEARTBEAT_INTERVAL, self, SendHeartbeat))
+    case AlreadyRegistered(id, blobPort) =>
+      if(!registered) {
+        log.warning("The TaskManager {} seems to be already registered at the JobManager
{} even" +
+          "though it has not yet finished the registration process.", self.path, sender.path)
 
-        profiler foreach {
-          _.tell(RegisterProfilingListener, JobManager.getProfiler(currentJobManager))
+        finishRegistration(id, blobPort)
+        registered = true
+      } else {
+        // ignore AlreadyRegistered messages which arrived after AcknowledgeRegistration
+        if(log.isDebugEnabled){
+          log.debug("The TaskManager {} has already been registered at the JobManager {}.",
+            self.path, sender.path)
         }
+      }
 
-        for (listener <- waitForRegistration) {
-          listener ! RegisteredAtJobManager
-        }
+    case RefuseRegistration(reason) =>
+      if(!registered) {
+        log.error("The registration of task manager {} was refused by the job manager {}
" +
+          "because {}.", self.path, jobManagerAkkaURL, reason)
 
-        waitForRegistration.clear()
+        // Shut task manager down
+        self ! PoisonPill
+      } else {
+        // ignore RefuseRegistration messages which arrived after AcknowledgeRegistration
+        if(log.isDebugEnabled) {
+          log.debug("Received RefuseRegistration from the JobManager even though being already
" +
+            "registered")
+        }
       }
-    }
 
     case SubmitTask(tdd) => {
       submitTask(tdd)
@@ -454,7 +471,34 @@ import scala.collection.JavaConverters._
     }
   }
 
-  def setupNetworkEnvironment(): Unit = {
+  private def finishRegistration(id: InstanceID, blobPort: Int): Unit = {
+    currentJobManager = sender
+    instanceID = id
+
+    context.watch(currentJobManager)
+
+    log.info(s"TaskManager successfully registered at JobManager ${
+      currentJobManager.path.toString
+    }.")
+
+    setupNetworkEnvironment()
+    setupLibraryCacheManager(blobPort)
+
+    heartbeatScheduler = Some(context.system.scheduler.schedule(
+      TaskManager.HEARTBEAT_INTERVAL, TaskManager.HEARTBEAT_INTERVAL, self, SendHeartbeat))
+
+    profiler foreach {
+      _.tell(RegisterProfilingListener, JobManager.getProfiler(currentJobManager))
+    }
+
+    for (listener <- waitForRegistration) {
+      listener ! RegisteredAtJobManager
+    }
+
+    waitForRegistration.clear()
+  }
+
+  private def setupNetworkEnvironment(): Unit = {
     //shutdown existing network environment
     networkEnvironment foreach {
       ne =>
@@ -730,8 +774,13 @@ object TaskManager {
     val timeout = FiniteDuration(configuration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
       ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
 
+    val maxRegistrationDuration = Duration(configuration.getString(
+      ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
+      ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION))
+
     val taskManagerConfig = TaskManagerConfiguration(numberOfSlots, memorySize, pageSize,
-      tmpDirs, cleanupInterval, memoryLoggingIntervalMs, profilingInterval, timeout)
+      tmpDirs, cleanupInterval, memoryLoggingIntervalMs, profilingInterval, timeout,
+      maxRegistrationDuration)
 
     (connectionInfo, jobManagerURL, taskManagerConfig, networkConfig)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/730e056a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
index a6a76a3..82cbe9e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
@@ -18,10 +18,10 @@
 
 package org.apache.flink.runtime.taskmanager
 
-import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.duration.{Duration, FiniteDuration}
 
 case class TaskManagerConfiguration(numberOfSlots: Int, memorySize: Long, pageSize: Int,
                                     tmpDirPaths: Array[String], cleanupInterval: Long,
                                     memoryLogggingIntervalMs: Option[Long],
                                     profilingInterval: Option[Long],
-                                    timeout: FiniteDuration)
+                                    timeout: FiniteDuration, maxRegistrationDuration: Duration)

http://git-wip-us.apache.org/repos/asf/flink/blob/730e056a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala
new file mode 100644
index 0000000..e4c1efb
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager
+
+import java.net.InetAddress
+
+import akka.actor._
+import akka.testkit.{TestKit, ImplicitSender}
+import org.apache.flink.runtime.instance.{InstanceID, HardwareDescription, InstanceConnectionInfo}
+import org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered,
+RefuseRegistration, AcknowledgeRegistration, RegisterTaskManager}
+import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
+import org.apache.flink.runtime.testingUtils.TestingUtils
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
+class TaskManagerRegistrationITCase(_system: ActorSystem) extends TestKit(_system) with
+ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
+
+  def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig))
+
+  override def afterAll(): Unit = {
+    TestKit.shutdownActorSystem(system)
+  }
+
+  "The JobManager" should {
+    "notify already registered TaskManagers" in {
+
+      val jm = TestingUtils.startTestingJobManager
+
+      val connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost,1)
+      val hardwareDescription = HardwareDescription.extractFromSystem(10)
+
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! RegisterTaskManager(connectionInfo, hardwareDescription, 1)
+          jm ! RegisterTaskManager(connectionInfo, hardwareDescription, 1)
+
+          expectMsgType[AcknowledgeRegistration]
+          expectMsgType[AlreadyRegistered]
+        }
+      } finally {
+        jm ! Kill
+      }
+    }
+  }
+
+  "The TaskManager" should {
+    "shutdown if its registration is refused by the JobManager" in {
+
+      val tm = TestingUtils.startTestingTaskManager(self)
+
+      watch(tm)
+
+      try{
+        within(TestingUtils.TESTING_DURATION) {
+          expectMsgType[RegisterTaskManager]
+          tm ! RefuseRegistration("Testing connection refusal")
+
+          expectTerminated(tm)
+        }
+      }
+    }
+
+    "ignore RefuseRegistration messages after it has been successfully registered" in {
+
+      val tm = TestingUtils.startTestingTaskManager(self)
+
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          expectMsgType[RegisterTaskManager]
+
+          tm ! AcknowledgeRegistration(new InstanceID(), 42)
+
+          tm ! RefuseRegistration("Should be ignored")
+
+          // Check if the TaskManager is still alive
+          tm ! Identify
+
+          expectMsgPF() {
+            // wait for actor identity
+            case x: ActorIdentity => true
+            // ignore heartbeats
+            case h: Heartbeat => false
+          }
+        }
+      } finally {
+        tm ! Kill
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/730e056a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 014a9ed..b658279 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.testingUtils
 
-import akka.actor.{Props, ActorSystem}
+import akka.actor.{ActorRef, Props, ActorSystem}
 import akka.testkit.CallingThreadDispatcher
 import com.typesafe.config.ConfigFactory
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
@@ -26,6 +26,7 @@ import org.apache.flink.core.io.IOReadableWritable
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.akka.serialization.IOReadableWritableSerializer
 import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ActionQueue
+import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.minicluster.FlinkMiniCluster
 import org.apache.flink.runtime.taskmanager.TaskManager
 import scala.concurrent.duration._
@@ -98,6 +99,23 @@ object TestingUtils {
       networkConnectionConfig) with TestingTaskManager))
   }
 
+  def startTestingJobManager(implicit system: ActorSystem): ActorRef = {
+    val config = new Configuration()
+
+    system.actorOf(Props(new JobManager(config) with TestingJobManager))
+  }
+
+  def startTestingTaskManager(jobManager: ActorRef)(implicit system: ActorSystem): ActorRef
= {
+    val jmURL = jobManager.path.toString
+    val config = new Configuration()
+    config.setString(ConfigConstants.JOB_MANAGER_AKKA_URL, jmURL)
+    val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfig) =
+      TaskManager.parseConfiguration("LOCALHOST", config)
+
+    system.actorOf(Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig,
+      networkConnectionConfig) with TestingTaskManager))
+  }
+
   def startTestingCluster(numSlots: Int, numTMs: Int = 1, timeout: Int = DEFAULT_AKKA_ASK_TIMEOUT):
   FlinkMiniCluster = {
     val config = new Configuration()

http://git-wip-us.apache.org/repos/asf/flink/blob/730e056a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index 37ae5ed..7c72ef4 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -39,6 +39,7 @@ object ApplicationMaster {
 
   val CONF_FILE = "flink-conf.yaml"
   val MODIFIED_CONF_FILE = "flink-conf-modified.yaml"
+  val MAX_REGISTRATION_DURATION = "5 minutes"
 
   def main(args: Array[String]): Unit ={
     val yarnClientUsername = System.getenv(FlinkYarnClient.ENV_CLIENT_USERNAME)
@@ -148,6 +149,9 @@ object ApplicationMaster {
         s"${ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY}: ${slots*taskManagerCount}")
     }
 
+    output.println(s"${ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION}: " +
+      s"$MAX_REGISTRATION_DURATION")
+
     // add dynamic properties
     val dynamicProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString)
 


Mime
View raw message