flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [2/2] flink git commit: [FLINK-6341] [jm] Add test case to guard against RM registration loop
Date Fri, 28 Apr 2017 16:12:11 GMT
[FLINK-6341] [jm] Add test case to guard against RM registration loop


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/591841f3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/591841f3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/591841f3

Branch: refs/heads/master
Commit: 591841f30f7a7652b6f418abde17c7f23becf1c0
Parents: 2383839
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Fri Apr 28 15:21:56 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Fri Apr 28 17:59:28 2017 +0200

----------------------------------------------------------------------
 .../flink/configuration/JobManagerOptions.java  |  10 ++
 .../messages/ReconnectResourceManager.java      |  18 ++--
 .../flink/runtime/jobmanager/JobManager.scala   |  24 +++--
 .../runtime/jobmanager/JobManagerTest.java      | 105 ++++++++++++++++++-
 4 files changed, 138 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/591841f3/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index d129405..10d9e16 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -72,6 +72,16 @@ public class JobManagerOptions {
 			.defaultValue(16)
 			.withDeprecatedKeys("job-manager.max-attempts-history-size");
 
+	/**
+	 * This option specifies the interval in order to trigger a resource manager reconnection
if the connection
+	 * to the resource manager has been lost.
+	 *
+	 * This option is only intended for internal use.
+	 */
+	public static final ConfigOption<Long> RESOURCE_MANAGER_RECONNECT_INTERVAL =
+		key("jobmanager.resourcemanager.reconnect-interval")
+		.defaultValue(2000L);
+
 	// ------------------------------------------------------------------------
 	//  JobManager web UI
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/591841f3/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java
index d02193e..1f852e8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/ReconnectResourceManager.java
@@ -22,7 +22,7 @@ import akka.actor.ActorRef;
 import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
 import org.apache.flink.util.Preconditions;
 
-import java.util.UUID;
+import java.io.Serializable;
 
 /**
  * This message signals that the ResourceManager should reconnect to the JobManager. It is
processed
@@ -30,28 +30,30 @@ import java.util.UUID;
  * the ResourceManager to go through the reconciliation phase to sync up with the JobManager
bookkeeping.
  * This is done by forcing the ResourceManager to reconnect.
  */
-public class ReconnectResourceManager implements RequiresLeaderSessionID, java.io.Serializable
{
+public class ReconnectResourceManager implements RequiresLeaderSessionID, Serializable {
 	private static final long serialVersionUID = 1L;
 
 	private final ActorRef resourceManager;
 
-	private final UUID currentConnID;
+	private final long connectionId;
 
-	public ReconnectResourceManager(ActorRef resourceManager, UUID currentConnID) {
+	public ReconnectResourceManager(ActorRef resourceManager, long connectionId) {
 		this.resourceManager = Preconditions.checkNotNull(resourceManager);
-		this.currentConnID = Preconditions.checkNotNull(currentConnID);
+		this.connectionId = Preconditions.checkNotNull(connectionId);
 	}
 	
 	public ActorRef resourceManager() {
 		return resourceManager;
 	}
 
-	public UUID connID() {
-		return currentConnID;
+	public long getConnectionId() {
+		return connectionId;
 	}
 
 	@Override
 	public String toString() {
-		return "ReconnectResourceManager " + resourceManager.path();
+		return "ReconnectResourceManager(" +
+			resourceManager.path() + ", " +
+			connectionId + ')';
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/591841f3/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index da9df2b..b01ddc0 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -178,10 +178,14 @@ class JobManager(
   /** The resource manager actor responsible for allocating and managing task manager resources.
*/
   var currentResourceManager: Option[ActorRef] = None
 
-  var currentRMConnID: UUID = null
+  var currentResourceManagerConnectionId: Long = 0
 
   val taskManagerMap = mutable.Map[ActorRef, InstanceID]()
 
+  val triggerResourceManagerReconnectInterval = new FiniteDuration(
+    flinkConfiguration.getLong(JobManagerOptions.RESOURCE_MANAGER_RECONNECT_INTERVAL),
+    TimeUnit.MILLISECONDS)
+
   /**
    * Run when the job manager is started. Simply logs an informational message.
    * The method also starts the leader election service.
@@ -339,7 +343,7 @@ class JobManager(
 
       // ditch current resource manager (if any)
       currentResourceManager = Option(msg.resourceManager())
-      currentRMConnID = UUID.randomUUID()
+      currentResourceManagerConnectionId += 1
 
       val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map(
         instance => instance.getTaskManagerID).toList.asJava
@@ -358,24 +362,25 @@ class JobManager(
       def reconnectRepeatedly(): Unit = {
         msg.resourceManager() ! decorateMessage(new TriggerRegistrationAtJobManager(self))
         // try again after some delay
-        context.system.scheduler.scheduleOnce(2 seconds) {
+        context.system.scheduler.scheduleOnce(triggerResourceManagerReconnectInterval) {
           self ! decorateMessage(msg)
         }(context.dispatcher)
       }
 
       currentResourceManager match {
-        case Some(rm) if rm.equals(msg.resourceManager()) && currentRMConnID.equals(msg.connID())
=>
+        case Some(rm) if rm.equals(msg.resourceManager()) &&
+          currentResourceManagerConnectionId == msg.getConnectionId =>
           // we should ditch the current resource manager
           log.debug(s"Disconnecting resource manager $rm and forcing a reconnect.")
           currentResourceManager = None
           reconnectRepeatedly()
-        case Some(rm) =>
-          // we have registered with another ResourceManager in the meantime, stop sending
-          // TriggerRegistrationAtJobManager messages to the old ResourceManager
         case None =>
           log.warn(s"No resource manager ${msg.resourceManager()} connected. " +
             s"Telling old ResourceManager to register again.")
           reconnectRepeatedly()
+        case _ =>
+        // we have established a new connection to a ResourceManager in the meantime, stop
sending
+        // TriggerRegistrationAtJobManager messages to the old ResourceManager
       }
 
     case msg @ RegisterTaskManager(
@@ -399,7 +404,10 @@ class JobManager(
                 case _ =>
                   log.warn("Failure while asking ResourceManager for RegisterResource. Retrying",
t)
               }
-              self ! decorateMessage(new ReconnectResourceManager(rm, currentRMConnID))
+              self ! decorateMessage(
+                new ReconnectResourceManager(
+                  rm,
+                  currentResourceManagerConnectionId))
           }(context.dispatcher)
 
         case None =>

http://git-wip-us.apache.org/repos/asf/flink/blob/591841f3/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index d7fc71d..6316bfd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -18,18 +18,22 @@
 
 package org.apache.flink.runtime.jobmanager;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
+import akka.actor.*;
 import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
 import com.typesafe.config.Config;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason;
+import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
+import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -40,6 +44,7 @@ import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -64,6 +69,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.StoppingSuccess;
 import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
+import org.apache.flink.runtime.messages.RegistrationMessages;
 import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation;
@@ -73,6 +79,7 @@ import org.apache.flink.runtime.query.KvStateServerAddress;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManager;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
@@ -89,11 +96,15 @@ import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.StoppableInvokable;
 import org.apache.flink.util.TestLogger;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.mockito.ArgumentCaptor;
 import scala.Option;
 import scala.Some;
 import scala.Tuple2;
@@ -107,6 +118,7 @@ import java.io.File;
 import java.net.InetAddress;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
 import static org.apache.flink.runtime.messages.JobManagerMessages.JobResultSuccess;
@@ -121,6 +133,9 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.*;
 
 public class JobManagerTest extends TestLogger {
 
@@ -1261,4 +1276,88 @@ public class JobManagerTest extends TestLogger {
 			}
 		}
 	}
+
+	/**
+	 * This tests makes sure that triggering a reconnection from the ResourceManager will stop
after a new
+	 * ResourceManager has connected. Furthermore it makes sure that there is not endless loop
of reconnection
+	 * commands (see FLINK-6341).
+	 */
+	@Test
+	public void testResourceManagerConnection() throws TimeoutException, InterruptedException
{
+		FiniteDuration testTimeout = new FiniteDuration(30L, TimeUnit.SECONDS);
+		final long reconnectionInterval = 200L;
+
+		final Configuration configuration = new Configuration();
+		configuration.setLong(JobManagerOptions.RESOURCE_MANAGER_RECONNECT_INTERVAL, reconnectionInterval);
+
+
+		final ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(configuration);
+
+		try {
+			final ActorGateway jmGateway = TestingUtils.createJobManager(
+				actorSystem,
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
+				configuration);
+
+			final TestProbe probe = TestProbe.apply(actorSystem);
+			final AkkaActorGateway rmGateway = new AkkaActorGateway(probe.ref(), HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+			// wait for the JobManager to become the leader
+			Future<?> leaderFuture = jmGateway.ask(TestingJobManagerMessages.getNotifyWhenLeader(),
testTimeout);
+			Await.ready(leaderFuture, testTimeout);
+
+			jmGateway.tell(new RegisterResourceManager(probe.ref()), rmGateway);
+
+			JobManagerMessages.LeaderSessionMessage leaderSessionMessage = probe.expectMsgClass(JobManagerMessages.LeaderSessionMessage.class);
+
+			assertEquals(HighAvailabilityServices.DEFAULT_LEADER_ID, leaderSessionMessage.leaderSessionID());
+			assertTrue(leaderSessionMessage.message() instanceof RegisterResourceManagerSuccessful);
+
+			jmGateway.tell(
+				new RegistrationMessages.RegisterTaskManager(
+					ResourceID.generate(),
+					mock(TaskManagerLocation.class),
+					new HardwareDescription(1, 1L, 1L, 1L),
+					1));
+			leaderSessionMessage = probe.expectMsgClass(JobManagerMessages.LeaderSessionMessage.class);
+
+			assertTrue(leaderSessionMessage.message() instanceof NotifyResourceStarted);
+
+			// fail the NotifyResourceStarted so that we trigger the reconnection process on the JobManager's
side
+			probe.lastSender().tell(new Status.Failure(new Exception("Test exception")), ActorRef.noSender());
+
+			Deadline reconnectionDeadline = new FiniteDuration(5L * reconnectionInterval, TimeUnit.MILLISECONDS).fromNow();
+			boolean registered = false;
+
+			while (reconnectionDeadline.hasTimeLeft()) {
+				try {
+					leaderSessionMessage = probe.expectMsgClass(reconnectionDeadline.timeLeft(), JobManagerMessages.LeaderSessionMessage.class);
+				} catch (AssertionError ignored) {
+					// expected timeout after the reconnectionDeadline has been exceeded
+					continue;
+				}
+
+				if (leaderSessionMessage.message() instanceof TriggerRegistrationAtJobManager) {
+					if (registered) {
+						fail("A successful registration should not be followed by another TriggerRegistrationAtJobManager
message.");
+					}
+
+					jmGateway.tell(new RegisterResourceManager(probe.ref()), rmGateway);
+				} else if (leaderSessionMessage.message() instanceof RegisterResourceManagerSuccessful)
{
+					// now we should no longer receive TriggerRegistrationAtJobManager messages
+					registered = true;
+				} else {
+					fail("Received unknown message: " + leaderSessionMessage.message() + '.');
+				}
+			}
+
+			assertTrue(registered);
+
+		} finally {
+			// cleanup the actor system and with it all of the started actors if not already terminated
+			actorSystem.shutdown();
+			actorSystem.awaitTermination();
+		}
+	}
 }


Mime
View raw message