flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-3347] [akka] Add QuarantineMonitor which shuts a quarantined actor system and JVM down
Date Mon, 06 Mar 2017 13:38:42 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.2 32a55a246 -> 1ea252a41


[FLINK-3347] [akka] Add QuarantineMonitor which shuts a quarantined actor system and JVM down

The QuarantineMonitor subscribes to the actor system's event bus and listens to
AssociationErrorEvents. These are the events which are generated when the actor system
has quarantined another actor system or if it has been quarantined by another actor
system. In case of the quarantined state, the actor system will be shutdown killing
all actors and then the JVM is terminated.

Disable QuarantineMonitor per default; Reintroduce config option for activation

This closes #3363.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1ea252a4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1ea252a4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1ea252a4

Branch: refs/heads/release-1.2
Commit: 1ea252a411e5f10feed22073ad09f7050fc2f0a8
Parents: 32a55a2
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Oct 27 00:24:12 2016 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Mon Mar 6 14:38:01 2017 +0100

----------------------------------------------------------------------
 docs/setup/config.md                            |   2 +
 .../flink/configuration/TaskManagerOptions.java |  11 +
 .../runtime/akka/DefaultQuarantineHandler.java  |  76 +++++
 .../flink/runtime/akka/QuarantineHandler.java   |  46 +++
 .../flink/runtime/akka/QuarantineMonitor.java   | 100 ++++++
 .../flink/runtime/taskmanager/TaskManager.scala |  15 +-
 .../runtime/akka/QuarantineMonitorTest.java     | 326 +++++++++++++++++++
 7 files changed, 575 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1ea252a4/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 5b00086..10a86a7 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -273,6 +273,8 @@ The following parameters configure Flink's JobManager and TaskManagers.
 
 - `task.cancellation-interval`: Time interval between two successive task cancellation attempts
in milliseconds (DEFAULT: **30000**).
 
+- `taskmanager.exit-on-fatal-akka-error`: Whether the TaskManager shall be terminated in
case of a fatal Akka error (quarantining event). (DEFAULT: **false**)
+
 ### Distributed Coordination (via Akka)
 
 - `akka.ask.timeout`: Timeout used for all futures and blocking Akka calls. If Flink fails
due to timeouts then you should try to increase this value. Timeouts can be caused by slow
machines or a congested network. The timeout value requires a time-unit specifier (ms/s/min/h/d)
(DEFAULT: **10 s**).

http://git-wip-us.apache.org/repos/asf/flink/blob/1ea252a4/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 6f6238b..3bd15fe 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -81,6 +81,17 @@ public class TaskManagerOptions {
 			key("task.checkpoint.alignment.max-size")
 			.defaultValue(-1L);
 
+	/**
+	 * Whether the quarantine monitor for task managers shall be started. The quarantine monitor
+	 * shuts down the actor system if it detects that it has quarantined another actor system
+	 * or if it has been quarantined by another actor system.
+	 *
+	 * @deprecated Only introduced in 1.2.1 to not change the default behaviour
+	 */
+	public static final ConfigOption<Boolean> EXIT_ON_FATAL_AKKA_ERROR =
+		key("taskmanager.exit-on-fatal-akka-error")
+		.defaultValue(false);
+
 	// ------------------------------------------------------------------------
 
 	/** Not intended to be instantiated */

http://git-wip-us.apache.org/repos/asf/flink/blob/1ea252a4/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
new file mode 100644
index 0000000..378cb25
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka;
+
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Default quarantine handler which logs the quarantining events, then shuts down the given
+ * actor system by sending Kill to all actors and then shutting the JVM down with the given
+ * exit code.
+ */
+public class DefaultQuarantineHandler implements QuarantineHandler {
+
+	private final FiniteDuration timeout;
+	private final int exitCode;
+	private final Logger log;
+
+	public DefaultQuarantineHandler(Time timeout, int exitCode, Logger log) {
+		Preconditions.checkNotNull(timeout);
+		this.timeout = new FiniteDuration(timeout.getSize(), timeout.getUnit());
+		this.exitCode = exitCode;
+		this.log = Preconditions.checkNotNull(log);
+	}
+
+	@Override
+	public void wasQuarantinedBy(String remoteSystem, ActorSystem actorSystem) {
+		Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);
+		log.error("The actor system {} has been quarantined by {}. Shutting the actor system "
+
+			"down to be able to reestablish a connection!", actorSystemAddress, remoteSystem);
+
+		shutdownActorSystem(actorSystem);
+	}
+
+	@Override
+	public void hasQuarantined(String remoteSystem, ActorSystem actorSystem) {
+		Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);
+		log.error("The actor system {} has quarantined the remote actor system {}. Shutting " +
+			"the actor system down to be able to reestablish a connection!", actorSystemAddress, remoteSystem);
+
+		shutdownActorSystem(actorSystem);
+	}
+
+	private void shutdownActorSystem(ActorSystem actorSystem) {
+		// shut the actor system down
+		actorSystem.shutdown();
+
+		try {
+			// give it some time to complete the shutdown
+			actorSystem.awaitTermination(timeout);
+		} finally {
+			// now let's crash the JVM
+			System.exit(exitCode);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1ea252a4/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineHandler.java
new file mode 100644
index 0000000..21623e8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineHandler.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka;
+
+import akka.actor.ActorSystem;
+
+/**
+ * Callback interface for the {@link QuarantineMonitor} which is called in case the actor
system
+ * has been quarantined or quarantined another system.
+ */
+public interface QuarantineHandler {
+
+	/**
+	 * Callback when the given actor system was quarantined by the given remote actor system.
+	 *
+	 * @param remoteSystem is the address of the remote actor system which has quarantined this
+	 *                     actor system
+	 * @param actorSystem which has been quarantined
+	 */
+	void wasQuarantinedBy(final String remoteSystem, final ActorSystem actorSystem);
+
+	/**
+	 * Callback when the given actor system has quarantined the given remote actor system.
+	 *
+	 * @param remoteSystem is the address of the remote actor system which has been quarantined
+	 *                     by our actor system
+	 * @param actorSystem which has quarantined the other actor system
+	 */
+	void hasQuarantined(final String remoteSystem, final ActorSystem actorSystem);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1ea252a4/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineMonitor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineMonitor.java
new file mode 100644
index 0000000..de82f29
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineMonitor.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka;
+
+import akka.actor.UntypedActor;
+import akka.remote.AssociationErrorEvent;
+import akka.remote.transport.Transport;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * The quarantine monitor subscribes to the event bus of the actor system in which it was
started.
+ * It listens to {@link AssociationErrorEvent} which contain information if we got quarantined
+ * or quarantine another remote actor system. If the actor detects that the actor system
has been
+ * quarantined or quarantined another system, then the {@link QuarantineHandler} is called.
+ *
+ * IMPORTANT: The implementation if highly specific for Akka 2.3.7. With different version
the
+ * quarantine state might be detected differently.
+ */
+public class QuarantineMonitor extends UntypedActor {
+
+	private static final Pattern pattern = Pattern.compile("^Invalid address:\\s+(.*)$");
+
+	private static final String QUARANTINE_MSG = "The remote system has a UID that has been
quarantined. Association aborted.";
+	private static final String QUARANTINED_MSG = "The remote system has quarantined this system.
No further associations to the remote system are possible until this system is restarted.";
+
+	private final QuarantineHandler handler;
+	private final Logger log;
+
+	public QuarantineMonitor(QuarantineHandler handler, Logger log) {
+		this.handler = Preconditions.checkNotNull(handler);
+		this.log = Preconditions.checkNotNull(log);
+	}
+
+	@Override
+	public void preStart() {
+		getContext().system().eventStream().subscribe(getSelf(), AssociationErrorEvent.class);
+	}
+
+	@Override
+	public void onReceive(Object message) throws Exception {
+		if (message instanceof AssociationErrorEvent) {
+			AssociationErrorEvent associationErrorEvent = (AssociationErrorEvent) message;
+
+			// IMPORTANT: The check for the quarantining event is highly specific to Akka 2.3.7
+			// and can change with a different Akka version.
+			// It assumes the following:
+			// AssociationErrorEvent(InvalidAssociation(InvalidAssociationException(QUARANTINE(D)_MSG))
+			if (associationErrorEvent.getCause() != null) {
+				Throwable invalidAssociation = associationErrorEvent.getCause();
+				Matcher matcher = pattern.matcher(invalidAssociation.getMessage());
+
+				final String remoteSystem;
+
+				if (matcher.find()) {
+					remoteSystem = matcher.group(1);
+				} else {
+					remoteSystem = "Unknown";
+				}
+
+				if (invalidAssociation.getCause() instanceof Transport.InvalidAssociationException) {
+					Transport.InvalidAssociationException invalidAssociationException = (Transport.InvalidAssociationException)
invalidAssociation.getCause();
+
+					// don't hate the player, hate the game! That's the only way to find out if we
+					// got quarantined or quarantined another actor system in Akka 2.3.7
+					if (QUARANTINE_MSG.equals(invalidAssociationException.getMessage())) {
+						handler.hasQuarantined(remoteSystem, getContext().system());
+					} else if (QUARANTINED_MSG.equals(invalidAssociationException.getMessage())) {
+						handler.wasQuarantinedBy(remoteSystem, getContext().system());
+					} else {
+						log.debug("The invalid association exception's message could not be matched.", associationErrorEvent);
+					}
+				} else {
+					log.debug("The association error event's root cause is not of type {}.", Transport.InvalidAssociationException.class.getSimpleName(),
associationErrorEvent);
+				}
+			} else {
+				log.debug("Received association error event which did not contain a cause.", associationErrorEvent);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1ea252a4/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 37b5e04..bc63655 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -30,13 +30,14 @@ import _root_.akka.pattern.ask
 import _root_.akka.util.Timeout
 import grizzled.slf4j.Logger
 import org.apache.commons.lang3.exception.ExceptionUtils
+import org.apache.flink.api.common.time.Time
 import org.apache.flink.configuration._
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.core.memory.{HeapMemorySegment, HybridMemorySegment, MemorySegmentFactory,
MemoryType}
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
+import org.apache.flink.runtime.akka.{AkkaUtils, DefaultQuarantineHandler, QuarantineMonitor}
 import org.apache.flink.runtime.clusterframework.messages.StopCluster
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.blob.{BlobCache, BlobClient, BlobService}
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor
@@ -1782,6 +1783,18 @@ object TaskManager {
         Props(classOf[ProcessReaper], taskManager, LOG.logger, RUNTIME_FAILURE_RETURN_CODE),
         "TaskManager_Process_Reaper")
 
+      if (configuration.getBoolean(TaskManagerOptions.EXIT_ON_FATAL_AKKA_ERROR)) {
+        val quarantineHandler = new DefaultQuarantineHandler(
+          Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis),
+          RUNTIME_FAILURE_RETURN_CODE,
+          LOG.logger)
+
+        LOG.debug("Starting TaskManager quarantine monitor")
+        taskManagerSystem.actorOf(
+          Props(classOf[QuarantineMonitor], quarantineHandler, LOG.logger)
+        )
+      }
+
       // if desired, start the logging daemon that periodically logs the
       // memory usage information
       if (LOG.isInfoEnabled && configuration.getBoolean(

http://git-wip-us.apache.org/repos/asf/flink/blob/1ea252a4/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
new file mode 100644
index 0000000..97309a4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.dispatch.OnComplete;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public class QuarantineMonitorTest extends TestLogger {
+
+	private static final Logger LOG = LoggerFactory.getLogger(QuarantineMonitorTest.class);
+
+	private static final FiniteDuration zeroDelay = new FiniteDuration(0L, TimeUnit.SECONDS);
+
+	// we need two actor systems because we're quarantining one of them
+	private static ActorSystem actorSystem1;
+	private ActorSystem actorSystem2;
+
+	@BeforeClass
+	public static void setup() {
+		Properties properties = new Properties();
+		properties.setProperty("akka.remote.watch-failure-detector.threshold", "0.00001");
+		properties.setProperty("akka.remote.watch-failure-detector.heartbeat-interval", "1 ms");
+		properties.setProperty("akka.remote.watch-failure-detector.acceptable-heartbeat-pause",
"1 ms");
+		Config deathWatch = ConfigFactory.parseProperties(properties);
+		Config defaultConfig = AkkaUtils.getDefaultAkkaConfig();
+
+		actorSystem1 = AkkaUtils.createActorSystem(deathWatch.withFallback(defaultConfig));
+	}
+
+	@AfterClass
+	public static void tearDown() {
+		if (actorSystem1 != null) {
+			actorSystem1.shutdown();
+			actorSystem1.awaitTermination();
+		}
+	}
+
+	@Before
+	public void setupTest() {
+		actorSystem2 = AkkaUtils.createDefaultActorSystem();
+	}
+
+	@After
+	public void tearDownTest() {
+		if (actorSystem2 != null) {
+			actorSystem2.shutdown();
+			actorSystem2.awaitTermination();
+		}
+	}
+
+	/**
+	 * Tests that the quarantine monitor detects if an actor system has been quarantined by
another
+	 * actor system.
+	 */
+	@Test(timeout = 5000L)
+	public void testWatcheeQuarantined() throws ExecutionException, InterruptedException {
+		TestingQuarantineHandler quarantineHandler = new TestingQuarantineHandler();
+
+		ActorRef watchee = null;
+		ActorRef watcher = null;
+		ActorRef monitor = null;
+
+		FiniteDuration timeout = new FiniteDuration(5, TimeUnit.SECONDS);
+		FiniteDuration interval = new FiniteDuration(200, TimeUnit.MILLISECONDS);
+
+		try {
+			// start the quarantine monitor in the watchee actor system
+			monitor = actorSystem2.actorOf(getQuarantineMonitorProps(quarantineHandler), "quarantineMonitor");
+
+			watchee = actorSystem2.actorOf(getWatcheeProps(timeout, interval, quarantineHandler),
"watchee");
+			watcher = actorSystem1.actorOf(getWatcherProps(timeout, interval, quarantineHandler),
"watcher");
+
+			final Address actorSystem1Address = AkkaUtils.getAddress(actorSystem1);
+			final String watcheeAddress = AkkaUtils.getAkkaURL(actorSystem2, watchee);
+			final String watcherAddress = AkkaUtils.getAkkaURL(actorSystem1, watcher);
+
+			// ping the watcher continuously
+			watchee.tell(new Ping(watcherAddress), ActorRef.noSender());
+			// start watching the watchee
+			watcher.tell(new Watch(watcheeAddress), ActorRef.noSender());
+
+			Future<String> quarantineFuture = quarantineHandler.getWasQuarantinedByFuture();
+
+			Assert.assertEquals(actorSystem1Address.toString(), quarantineFuture.get());
+		} finally {
+			TestingUtils.stopActor(watchee);
+			TestingUtils.stopActor(watcher);
+			TestingUtils.stopActor(monitor);
+		}
+	}
+
+	/**
+	 * Tests that the quarantine monitor detects if an actor system quarantines another actor
+	 * system.
+	 */
+	@Test(timeout = 5000L)
+	public void testWatcherQuarantining() throws ExecutionException, InterruptedException {
+		TestingQuarantineHandler quarantineHandler = new TestingQuarantineHandler();
+
+		ActorRef watchee = null;
+		ActorRef watcher = null;
+		ActorRef monitor = null;
+
+		FiniteDuration timeout = new FiniteDuration(5, TimeUnit.SECONDS);
+		FiniteDuration interval = new FiniteDuration(200, TimeUnit.MILLISECONDS);
+
+		try {
+			// start the quarantine monitor in the watcher actor system
+			monitor = actorSystem1.actorOf(getQuarantineMonitorProps(quarantineHandler), "quarantineMonitor");
+
+			watchee = actorSystem2.actorOf(getWatcheeProps(timeout, interval, quarantineHandler),
"watchee");
+			watcher = actorSystem1.actorOf(getWatcherProps(timeout, interval, quarantineHandler),
"watcher");
+
+			final Address actorSystem1Address = AkkaUtils.getAddress(actorSystem2);
+			final String watcheeAddress = AkkaUtils.getAkkaURL(actorSystem2, watchee);
+			final String watcherAddress = AkkaUtils.getAkkaURL(actorSystem1, watcher);
+
+			// ping the watcher continuously
+			watchee.tell(new Ping(watcherAddress), ActorRef.noSender());
+			// start watching the watchee
+			watcher.tell(new Watch(watcheeAddress), ActorRef.noSender());
+
+			Future<String> quarantineFuture = quarantineHandler.getHasQuarantinedFuture();
+
+			Assert.assertEquals(actorSystem1Address.toString(), quarantineFuture.get());
+		} finally {
+			TestingUtils.stopActor(watchee);
+			TestingUtils.stopActor(watcher);
+			TestingUtils.stopActor(monitor);
+		}
+	}
+
+	private static class TestingQuarantineHandler implements QuarantineHandler, ErrorHandler
{
+
+		private final CompletableFuture<String> wasQuarantinedByFuture;
+		private final CompletableFuture<String> hasQuarantinedFuture;
+
+		public TestingQuarantineHandler() {
+			this.wasQuarantinedByFuture = new FlinkCompletableFuture<>();
+			this.hasQuarantinedFuture = new FlinkCompletableFuture<>();
+		}
+
+		@Override
+		public void wasQuarantinedBy(String remoteSystem, ActorSystem actorSystem) {
+			wasQuarantinedByFuture.complete(remoteSystem);
+		}
+
+		@Override
+		public void hasQuarantined(String remoteSystem, ActorSystem actorSystem) {
+			hasQuarantinedFuture.complete(remoteSystem);
+		}
+
+		public Future<String> getWasQuarantinedByFuture() {
+			return wasQuarantinedByFuture;
+		}
+
+		public Future<String> getHasQuarantinedFuture() {
+			return hasQuarantinedFuture;
+		}
+
+		@Override
+		public void handleError(Throwable failure) {
+			wasQuarantinedByFuture.completeExceptionally(failure);
+			hasQuarantinedFuture.completeExceptionally(failure);
+		}
+	}
+
+	private interface ErrorHandler {
+		void handleError(Throwable failure);
+	}
+
+	static class Watcher extends UntypedActor {
+
+		private final FiniteDuration timeout;
+		private final FiniteDuration interval;
+		private final ErrorHandler errorHandler;
+
+		Watcher(FiniteDuration timeout, FiniteDuration interval, ErrorHandler errorHandler) {
+			this.timeout = Preconditions.checkNotNull(timeout);
+			this.interval = Preconditions.checkNotNull(interval);
+			this.errorHandler = Preconditions.checkNotNull(errorHandler);
+		}
+
+		@Override
+		public void onReceive(Object message) throws Exception {
+			if (message instanceof Watch) {
+				Watch watch = (Watch) message;
+
+				getContext().actorSelection(watch.getTarget()).resolveOne(timeout).onComplete(new OnComplete<ActorRef>()
{
+					@Override
+					public void onComplete(Throwable failure, ActorRef success) throws Throwable {
+						if (success != null) {
+							getContext().watch(success);
+							// constantly ping the watchee
+							getContext().system().scheduler().schedule(
+								zeroDelay,
+								interval,
+								success,
+								"Watcher message",
+								getContext().dispatcher(),
+								getSelf());
+						} else {
+							errorHandler.handleError(failure);
+						}
+					}
+				}, getContext().dispatcher());
+			}
+		}
+	}
+
+	static class Watchee extends UntypedActor {
+
+		private final FiniteDuration timeout;
+		private final FiniteDuration interval;
+		private final ErrorHandler errorHandler;
+
+		Watchee(FiniteDuration timeout, FiniteDuration interval, ErrorHandler errorHandler) {
+			this.timeout = Preconditions.checkNotNull(timeout);
+			this.interval = Preconditions.checkNotNull(interval);
+			this.errorHandler = Preconditions.checkNotNull(errorHandler);
+		}
+
+		@Override
+		public void onReceive(Object message) throws Exception {
+			if (message instanceof Ping) {
+				final Ping ping = (Ping) message;
+
+				getContext().actorSelection(ping.getTarget()).resolveOne(timeout).onComplete(new OnComplete<ActorRef>()
{
+					@Override
+					public void onComplete(Throwable failure, ActorRef success) throws Throwable {
+						if (success != null) {
+							// constantly ping the target
+							getContext().system().scheduler().schedule(
+								zeroDelay,
+								interval,
+								success,
+								"Watchee message",
+								getContext().dispatcher(),
+								getSelf());
+						} else {
+							errorHandler.handleError(failure);
+						}
+					}
+				}, getContext().dispatcher());
+			}
+		}
+	}
+
+	static class Watch {
+		private final String target;
+
+		Watch(String target) {
+			this.target = target;
+		}
+
+		public String getTarget() {
+			return target;
+		}
+	}
+
+	static class Ping {
+		private final String target;
+
+		Ping(String target) {
+			this.target = target;
+		}
+
+		public String getTarget() {
+			return target;
+		}
+	}
+
+	static Props getWatcheeProps(FiniteDuration timeout, FiniteDuration interval, ErrorHandler
errorHandler) {
+		return Props.create(Watchee.class, timeout, interval, errorHandler);
+	}
+
+	static Props getWatcherProps(FiniteDuration timeout, FiniteDuration interval, ErrorHandler
errorHandler) {
+		return Props.create(Watcher.class, timeout, interval, errorHandler);
+	}
+
+	static Props getQuarantineMonitorProps(QuarantineHandler handler) {
+		return Props.create(QuarantineMonitor.class, handler, LOG);
+	}
+
+}


Mime
View raw message