flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [hotfix] [tests] Harden JobManagerRegistrationTest
Date Thu, 27 Apr 2017 10:46:55 GMT
Repository: flink
Updated Branches:
  refs/heads/master 90ca43810 -> 926547582


[hotfix] [tests] Harden JobManagerRegistrationTest

The problem is that we don't wait until the JobManager becomes the leader. Due to this,
the sent RegisterTaskManager messages might get dropped.

This PR fixes the problem by waiting on the completion of the NotifyWhenLeader message.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/92654758
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/92654758
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/92654758

Branch: refs/heads/master
Commit: 926547582d62733bdedb1cdcc277653b2ff7b4e7
Parents: 90ca438
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Apr 27 12:32:25 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Apr 27 12:43:50 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManager.scala   |  2 +-
 .../jobmanager/JobManagerRegistrationTest.scala | 91 +++++++++++---------
 2 files changed, 52 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/92654758/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 40e2c2a..2fc3ef4 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -216,7 +216,7 @@ class JobManager(
       case Some(group) =>
         instantiateMetrics(group)
       case None =>
-        log.warn("Could not instantiate JobManager metrics.")
+        log.warn("Could not instantiate JobManager metric group.")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/92654758/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index 6be8bcc..b2e8005 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -33,7 +33,8 @@ import org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.PlainForwa
 import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage
 import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered,
RegisterTaskManager}
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation
-import org.apache.flink.runtime.testingUtils.TestingUtils
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenLeader
+import org.apache.flink.runtime.testingUtils.{TestingJobManager, TestingUtils}
 import org.apache.flink.runtime.testutils.TestingResourceManager
 import org.apache.flink.runtime.util.LeaderRetrievalUtils
 import org.junit.Assert.{assertNotEquals, assertNotNull}
@@ -41,6 +42,7 @@ import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
 
+import scala.concurrent.Await
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
@@ -94,39 +96,44 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll
{
         var id1: InstanceID = null
         var id2: InstanceID = null
 
+        // wait until the JobManager becomes the leader, otherwise the RegisterTaskManager
messages
+        // are dropped
+        val leaderFuture = jm.ask(NotifyWhenLeader, TestingUtils.TESTING_TIMEOUT)
+        Await.ready(leaderFuture, TestingUtils.TESTING_TIMEOUT)
+
         // task manager 1
         within(10 seconds) {
-         jm.tell(
-           RegisterTaskManager(
-             resourceId1,
-             connectionInfo1,
-             hardwareDescription,
-             1),
-           new AkkaActorGateway(tm1, HighAvailabilityServices.DEFAULT_LEADER_ID))
-
-         val response = probe.expectMsgType[LeaderSessionMessage]
-         response match {
-           case LeaderSessionMessage(_, AcknowledgeRegistration(id, _)) => id1 = id
-           case _ => fail("Wrong response message: " + response)
-         }
-       }
+          jm.tell(
+            RegisterTaskManager(
+              resourceId1,
+              connectionInfo1,
+              hardwareDescription,
+              1),
+            new AkkaActorGateway(tm1, HighAvailabilityServices.DEFAULT_LEADER_ID))
+
+          val response = probe.expectMsgType[LeaderSessionMessage]
+          response match {
+            case LeaderSessionMessage(_, AcknowledgeRegistration(id, _)) => id1 = id
+            case _ => fail("Wrong response message: " + response)
+          }
+        }
 
         // task manager 2
         within(10 seconds) {
-         jm.tell(
-           RegisterTaskManager(
-             resourceId2,
-             connectionInfo2,
-             hardwareDescription,
-             1),
-           new AkkaActorGateway(tm2, HighAvailabilityServices.DEFAULT_LEADER_ID))
-
-         val response = probe.expectMsgType[LeaderSessionMessage]
-         response match {
-           case LeaderSessionMessage(leaderSessionID, AcknowledgeRegistration(id, _)) =>
id2 = id
-           case _ => fail("Wrong response message: " + response)
-         }
-       }
+          jm.tell(
+            RegisterTaskManager(
+              resourceId2,
+              connectionInfo2,
+              hardwareDescription,
+              1),
+            new AkkaActorGateway(tm2, HighAvailabilityServices.DEFAULT_LEADER_ID))
+
+          val response = probe.expectMsgType[LeaderSessionMessage]
+          response match {
+            case LeaderSessionMessage(leaderSessionID, AcknowledgeRegistration(id, _)) =>
id2 = id
+            case _ => fail("Wrong response message: " + response)
+          }
+        }
 
         assertNotNull(id1)
         assertNotNull(id2)
@@ -160,6 +167,11 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll
{
         val connectionInfo = new TaskManagerLocation(resourceID, InetAddress.getLocalHost,
1)
         val hardwareDescription = HardwareDescription.extractFromSystem(10)
 
+        // wait until the JobManager becomes the leader, otherwise the RegisterTaskManager
messages
+        // are dropped
+        val leaderFuture = jm.ask(NotifyWhenLeader, TestingUtils.TESTING_TIMEOUT)
+        Await.ready(leaderFuture, TestingUtils.TESTING_TIMEOUT)
+
         within(20 seconds) {
           jm.tell(
             RegisterTaskManager(
@@ -167,7 +179,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll
{
               connectionInfo,
               hardwareDescription,
               1),
-              selfGateway)
+            selfGateway)
 
           jm.tell(
             RegisterTaskManager(
@@ -175,7 +187,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll
{
               connectionInfo,
               hardwareDescription,
               1),
-              selfGateway)
+            selfGateway)
 
           jm.tell(
             RegisterTaskManager(
@@ -183,26 +195,26 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll
{
               connectionInfo,
               hardwareDescription,
               1),
-              selfGateway)
+            selfGateway)
 
           probe.expectMsgType[LeaderSessionMessage] match {
             case LeaderSessionMessage(
-              HighAvailabilityServices.DEFAULT_LEADER_ID,
-              AcknowledgeRegistration(_, _)) =>
+            HighAvailabilityServices.DEFAULT_LEADER_ID,
+            AcknowledgeRegistration(_, _)) =>
             case m => fail("Wrong message type: " + m)
           }
 
           probe.expectMsgType[LeaderSessionMessage] match {
             case LeaderSessionMessage(
-              HighAvailabilityServices.DEFAULT_LEADER_ID,
-              AlreadyRegistered(_, _)) =>
+            HighAvailabilityServices.DEFAULT_LEADER_ID,
+            AlreadyRegistered(_, _)) =>
             case m => fail("Wrong message type: " + m)
           }
 
           probe.expectMsgType[LeaderSessionMessage] match {
             case LeaderSessionMessage(
-              HighAvailabilityServices.DEFAULT_LEADER_ID,
-              AlreadyRegistered(_, _)) =>
+            HighAvailabilityServices.DEFAULT_LEADER_ID,
+            AlreadyRegistered(_, _)) =>
             case m => fail("Wrong message type: " + m)
           }
         }
@@ -227,7 +239,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll
{
     // if there exists already one of these actors (e.g. JobManager has not been properly
shutdown),
     // then this will fail the JobManager creation
     val props = JobManager.getJobManagerProps(
-      classOf[JobManager],
+      classOf[TestingJobManager],
       config,
       executor,
       executor,
@@ -249,7 +261,6 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll
{
   }
 
   private def startTestingResourceManager(system: ActorSystem, jm: ActorRef): ActorGateway
= {
-    val jobManagerURL = AkkaUtils.getAkkaURL(system, jm)
     val config = new Configuration()
     val rm: ActorRef = FlinkResourceManager.startResourceManagerActors(
       config,


Mime
View raw message