flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] azagrebin commented on a change in pull request #6701: [FLINK-10349] Unify stopActor utils
Date Tue, 18 Sep 2018 10:09:50 GMT
azagrebin commented on a change in pull request #6701: [FLINK-10349] Unify stopActor utils
URL: https://github.com/apache/flink/pull/6701#discussion_r218365880
 
 

 ##########
 File path: flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
 ##########
 @@ -87,12 +89,52 @@
 		return FutureUtils.completeAll(terminationFutures);
 	}
 
-	public static void stopActor(AkkaActorGateway akkaActorGateway) {
-		stopActor(akkaActorGateway.actor());
-	}
+	// ---------- Utils to stop an actor ----------
+
+	private static final FiniteDuration DEFAULT_TIMEOUT = FiniteDuration.apply(1, TimeUnit.MINUTES);
 
 	public static void stopActor(ActorRef actorRef) {
-		actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
+		if (actorRef != null) {
+			actorRef.tell(Kill.getInstance(), ActorRef.noSender());
+		}
+	}
+
+	public static void stopActor(ActorGateway actorGateway) {
+		if (actorGateway != null) {
+			stopActor(actorGateway.actor());
+		}
+	}
+
+	public static void stopActorGracefully(ActorRef actorRef) {
+		stopActorsGracefully(actorRef);
+	}
+
+	public static void stopActorGracefully(ActorGateway actorGateway) {
+		stopActorGracefully(actorGateway.actor());
+	}
+
+	public static void stopActorsGracefully(@Nonnull ActorRef... actorRefs) {
+		List<CompletableFuture<?>> futures = new ArrayList<>(actorRefs.length);
+
+		for (ActorRef actorRef : actorRefs) {
+			if (actorRef != null) {
+				futures.add(FutureUtils.toJava(Patterns.gracefulStop(actorRef, DEFAULT_TIMEOUT)));
+			}
+		}
+
+		FutureUtils.waitForAll(futures);
 
 Review comment:
   I think the previous implementation of `stopActorsGracefully(actorRefs)` in `TestingUtils`
really blocked and waited with timeout. `FutureUtils.waitForAll` returns just a `ConjunctFuture`
which is currently not blocked upon.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message