flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-10403) Port JobManagerHAProcessFailureBatchRecoveryITCase to new code base
Date Fri, 28 Sep 2018 14:37:06 GMT

    [ https://issues.apache.org/jira/browse/FLINK-10403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16631938#comment-16631938 ] 

ASF GitHub Bot commented on FLINK-10403:
----------------------------------------

asfgit closed pull request #6751: [FLINK-10403] Port JobManagerHAProcessFailureBatchRecoveryITCase to new code base
URL: https://github.com/apache/flink/pull/6751
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index c9a17227f79..9eaef34a33a 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -54,6 +54,7 @@
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
@@ -88,7 +89,7 @@
  *
  * <p>Specialization of this class can be used for the session mode and the per-job mode
  */
-public abstract class ClusterEntrypoint implements FatalErrorHandler {
+public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErrorHandler {
 
 	public static final ConfigOption<String> EXECUTION_MODE = ConfigOptions
 		.key("internal.cluster.execution-mode")
@@ -147,7 +148,7 @@ protected ClusterEntrypoint(Configuration configuration) {
 		return terminationFuture;
 	}
 
-	protected void startCluster() throws ClusterEntrypointException {
+	public void startCluster() throws ClusterEntrypointException {
 		LOG.info("Starting {}.", getClass().getSimpleName());
 
 		try {
@@ -312,6 +313,14 @@ protected MetricRegistryImpl createMetricRegistry(Configuration configuration) {
 		return new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
 	}
 
+	@Override
+	public CompletableFuture<Void> closeAsync() {
+		return shutDownAsync(
+			ApplicationStatus.UNKNOWN,
+			"Cluster entrypoint has been closed externally.",
+			true).thenAccept(ignored -> {});
+	}
+
 	protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
 		synchronized (lock) {
 			Throwable exception = null;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
index 94925b2aba0..b07095c2b6d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
@@ -116,6 +116,16 @@ private void registerShutDownFuture() {
 		return shutDownFuture;
 	}
 
+	@Nonnull
+	public T getDispatcher() {
+		return dispatcher;
+	}
+
+	@Nonnull
+	public WebMonitorEndpoint<?> getWebMonitorEndpoint() {
+		return webMonitorEndpoint;
+	}
+
 	@Override
 	public CompletableFuture<Void> closeAsync() {
 		if (isRunning.compareAndSet(true, false)) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DispatcherProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DispatcherProcess.java
new file mode 100644
index 00000000000..85d3caa26fa
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DispatcherProcess.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
+import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
+import org.apache.flink.runtime.jobmanager.JobManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link Dispatcher} instance running in a separate JVM.
+ */
+public class DispatcherProcess extends TestJvmProcess {
+
+	private static final Logger LOG = LoggerFactory.getLogger(JobManagerProcess.class);
+
+	/** ID for this JobManager. */
+	private final int id;
+
+	/** The configuration for the JobManager. */
+	private final Configuration config;
+
+	/** Configuration parsed as args for {@link JobManagerProcess.JobManagerProcessEntryPoint}. */
+	private final String[] jvmArgs;
+
+	/**
+	 * Creates a {@link JobManager} running in a separate JVM.
+	 *
+	 * @param id     ID for the JobManager
+	 * @param config Configuration for the job manager process
+	 *
+	 * @throws Exception
+	 */
+	public DispatcherProcess(int id, Configuration config) throws Exception {
+		checkArgument(id >= 0, "Negative ID");
+		this.id = id;
+		this.config = checkNotNull(config, "Configuration");
+
+		ArrayList<String> args = new ArrayList<>();
+
+		for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
+			args.add("--" + entry.getKey());
+			args.add(entry.getValue());
+		}
+
+		this.jvmArgs = new String[args.size()];
+		args.toArray(jvmArgs);
+	}
+
+	@Override
+	public String getName() {
+		return "JobManager " + id;
+	}
+
+	@Override
+	public String[] getJvmArgs() {
+		return jvmArgs;
+	}
+
+	@Override
+	public String getEntryPointClassName() {
+		return DispatcherProcessEntryPoint.class.getName();
+	}
+
+	public Configuration getConfig() {
+		return config;
+	}
+
+	@Override
+	public String toString() {
+		return String.format("JobManagerProcess(id=%d)", id);
+	}
+
+	/**
+	 * Entry point for the JobManager process.
+	 */
+	public static class DispatcherProcessEntryPoint {
+
+		private static final Logger LOG = LoggerFactory.getLogger(DispatcherProcessEntryPoint.class);
+
+		/**
+		 * Entrypoint of the DispatcherProcessEntryPoint.
+		 *
+		 * <p>Other arguments are parsed to a {@link Configuration} and passed to the
+		 * JobManager, for instance: <code>--high-availability ZOOKEEPER --high-availability.zookeeper.quorum
+		 * "xyz:123:456"</code>.
+		 */
+		public static void main(String[] args) {
+			try {
+				ParameterTool params = ParameterTool.fromArgs(args);
+				Configuration config = params.getConfiguration();
+				LOG.info("Configuration: {}.", config);
+
+				config.setInteger(JobManagerOptions.PORT, 0);
+				config.setInteger(RestOptions.PORT, 0);
+
+				final StandaloneSessionClusterEntrypoint clusterEntrypoint = new StandaloneSessionClusterEntrypoint(config);
+
+				ClusterEntrypoint.runClusterEntrypoint(clusterEntrypoint);
+			}
+			catch (Throwable t) {
+				LOG.error("Failed to start JobManager process", t);
+				System.exit(1);
+			}
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
deleted file mode 100644
index b381f62b97e..00000000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testutils;
-
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A {@link TaskManager} instance running in a separate JVM.
- */
-public class TaskManagerProcess extends TestJvmProcess {
-
-	/** ID for this TaskManager */
-	private final int id;
-
-	/** The configuration for the TaskManager */
-	private final Configuration config;
-
-	/** Configuration parsed as args for {@link TaskManagerProcess.TaskManagerProcessEntryPoint} */
-	private final String[] jvmArgs;
-
-	public TaskManagerProcess(int id, Configuration config) throws Exception {
-		checkArgument(id >= 0, "Negative ID");
-		this.id = id;
-		this.config = checkNotNull(config, "Configuration");
-
-		ArrayList<String> args = new ArrayList<>();
-
-		for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
-			args.add("--" + entry.getKey());
-			args.add(entry.getValue());
-		}
-
-		this.jvmArgs = new String[args.size()];
-		args.toArray(jvmArgs);
-	}
-
-	@Override
-	public String getName() {
-		return "TaskManager " + id;
-	}
-
-	@Override
-	public String[] getJvmArgs() {
-		return jvmArgs;
-	}
-
-	@Override
-	public String getEntryPointClassName() {
-		return TaskManagerProcessEntryPoint.class.getName();
-	}
-
-	public int getId() {
-		return id;
-	}
-
-	@Override
-	public String toString() {
-		return String.format("TaskManagerProcess(id=%d)", id);
-	}
-
-	/**
-	 * Entry point for the TaskManager process.
-	 */
-	public static class TaskManagerProcessEntryPoint {
-
-		private static final Logger LOG = LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class);
-
-		/**
-		 * All arguments are parsed to a {@link Configuration} and passed to the Taskmanager,
-		 * for instance: <code>--high-availability ZOOKEEPER --high-availability.zookeeper.quorum "xyz:123:456"</code>.
-		 */
-		public static void main(String[] args) throws Exception {
-			try {
-				Configuration config = ParameterTool.fromArgs(args).getConfiguration();
-
-				if (!config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE)) {
-					config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
-				}
-
-				if (!config.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
-					config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
-				}
-
-
-				LOG.info("Configuration: {}.", config);
-
-				// Run the TaskManager
-				TaskManager.selectNetworkInterfaceAndRunTaskManager(
-					config,
-					ResourceID.generate(),
-					TaskManager.class);
-
-				// Run forever
-				new CountDownLatch(1).await();
-			}
-			catch (Throwable t) {
-				LOG.error("Failed to start TaskManager process", t);
-				System.exit(1);
-			}
-		}
-	}
-
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
index 080ecf81e66..654b2bd66d7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
@@ -62,4 +62,8 @@ protected void after() {
 	public int getBlobServerPort() {
 		return blobServer.getPort();
 	}
+
+	public BlobServer getBlobServer() {
+		return blobServer;
+	}
 }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 56327adaae0..5d7f26bb886 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -19,28 +19,19 @@
 package org.apache.flink.test.recovery;
 
 import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HeartbeatManagerOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
+import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.util.BlobServerResource;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -51,17 +42,8 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.StringWriter;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
 import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
 import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
 import static org.junit.Assert.assertFalse;
@@ -91,6 +73,9 @@
 	@Rule
 	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
 
+	@Rule
+	public final BlobServerResource blobServerResource = new BlobServerResource();
+
 	@Test
 	public void testTaskManagerProcessFailure() throws Exception {
 
@@ -98,15 +83,24 @@ public void testTaskManagerProcessFailure() throws Exception {
 		final StringWriter processOutput2 = new StringWriter();
 		final StringWriter processOutput3 = new StringWriter();
 
-		ActorSystem jmActorSystem = null;
-		HighAvailabilityServices highAvailabilityServices = null;
 		Process taskManagerProcess1 = null;
 		Process taskManagerProcess2 = null;
 		Process taskManagerProcess3 = null;
 
 		File coordinateTempDir = null;
 
-		try {
+		final int jobManagerPort = NetUtils.getAvailablePort();
+		final int restPort = NetUtils.getAvailablePort();
+
+		Configuration jmConfig = new Configuration();
+		jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
+		jmConfig.setString(JobManagerOptions.ADDRESS, "localhost");
+		jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
+		jmConfig.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 500L);
+		jmConfig.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 10000L);
+		jmConfig.setInteger(RestOptions.PORT, restPort);
+
+		try (final StandaloneSessionClusterEntrypoint clusterEntrypoint = new StandaloneSessionClusterEntrypoint(jmConfig)) {
 			// check that we run this test only if the java command
 			// is available on this machine
 			String javaCommand = getJavaCommandPath();
@@ -123,37 +117,7 @@ public void testTaskManagerProcessFailure() throws Exception {
 			// coordination between the processes goes through a directory
 			coordinateTempDir = temporaryFolder.newFolder();
 
-			// find a free port to start the JobManager
-			final int jobManagerPort = NetUtils.getAvailablePort();
-
-			// start a JobManager
-			Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
-
-			Configuration jmConfig = new Configuration();
-			jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1000 ms");
-			jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "6 s");
-			jmConfig.setInteger(AkkaOptions.WATCH_THRESHOLD, 9);
-			jmConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "10 s");
-			jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
-			jmConfig.setString(JobManagerOptions.ADDRESS, localAddress._1());
-			jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
-
-			highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
-				jmConfig,
-				TestingUtils.defaultExecutor(),
-				HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
-
-			jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<>(localAddress));
-			ActorRef jmActor = JobManager.startJobManagerActors(
-				jmConfig,
-				jmActorSystem,
-				TestingUtils.defaultExecutor(),
-				TestingUtils.defaultExecutor(),
-				highAvailabilityServices,
-				NoOpMetricRegistry.INSTANCE,
-				Option.empty(),
-				JobManager.class,
-				MemoryArchivist.class)._1();
+			clusterEntrypoint.startCluster();
 
 			// the TaskManager java command
 			String[] command = new String[] {
@@ -162,7 +126,7 @@ public void testTaskManagerProcessFailure() throws Exception {
 					"-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
 					"-Xms80m", "-Xmx80m",
 					"-classpath", getCurrentClasspath(),
-					TaskManagerProcessEntryPoint.class.getName(),
+					TaskExecutorProcessEntryPoint.class.getName(),
 					String.valueOf(jobManagerPort)
 			};
 
@@ -172,10 +136,6 @@ public void testTaskManagerProcessFailure() throws Exception {
 			taskManagerProcess2 = new ProcessBuilder(command).start();
 			new CommonTestUtils.PipeForwarder(taskManagerProcess2.getErrorStream(), processOutput2);
 
-			// we wait for the JobManager to have the two TaskManagers available
-			// since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes)
-			waitUntilNumTaskManagersAreRegistered(jmActor, 2, 120000);
-
 			// the program will set a marker file in each of its parallel tasks once they are ready, so that
 			// this coordinating code is aware of this.
 			// the program will very slowly consume elements until the marker file (later created by the
@@ -188,7 +148,7 @@ public void testTaskManagerProcessFailure() throws Exception {
 				@Override
 				public void run() {
 					try {
-						testTaskManagerFailure(jobManagerPort, coordinateDirClosure);
+						testTaskManagerFailure(restPort, coordinateDirClosure);
 					}
 					catch (Throwable t) {
 						t.printStackTrace();
@@ -219,10 +179,6 @@ public void run() {
 			taskManagerProcess3 = new ProcessBuilder(command).start();
 			new CommonTestUtils.PipeForwarder(taskManagerProcess3.getErrorStream(), processOutput3);
 
-			// we wait for the third TaskManager to register
-			// since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes)
-			waitUntilNumTaskManagersAreRegistered(jmActor, 3, 120000);
-
 			// kill one of the previous TaskManagers, triggering a failure and recovery
 			taskManagerProcess1.destroy();
 			taskManagerProcess1 = null;
@@ -269,13 +225,6 @@ public void run() {
 			if (taskManagerProcess3 != null) {
 				taskManagerProcess3.destroy();
 			}
-			if (jmActorSystem != null) {
-				jmActorSystem.shutdown();
-			}
-
-			if (highAvailabilityServices != null) {
-				highAvailabilityServices.closeAndCleanupAllData();
-			}
 		}
 	}
 
@@ -289,44 +238,6 @@ public void run() {
 	 */
 	public abstract void testTaskManagerFailure(int jobManagerPort, File coordinateDir) throws Exception;
 
-	protected void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelayMillis)
-			throws Exception {
-		final long pollInterval = 10_000_000; // 10 ms = 10,000,000 nanos
-		final long deadline = System.nanoTime() + maxDelayMillis * 1_000_000;
-
-		long time;
-
-		while ((time = System.nanoTime()) < deadline) {
-			FiniteDuration timeout = new FiniteDuration(pollInterval, TimeUnit.NANOSECONDS);
-
-			try {
-				Future<?> result = Patterns.ask(jobManager,
-						JobManagerMessages.getRequestNumberRegisteredTaskManager(),
-						new Timeout(timeout));
-
-				int numTMs = (Integer) Await.result(result, timeout);
-
-				if (numTMs == numExpected) {
-					return;
-				}
-			}
-			catch (TimeoutException e) {
-				// ignore and retry
-			}
-			catch (ClassCastException e) {
-				fail("Wrong response: " + e.getMessage());
-			}
-
-			long timePassed = System.nanoTime() - time;
-			long remainingMillis = (pollInterval - timePassed) / 1_000_000;
-			if (remainingMillis > 0) {
-				Thread.sleep(remainingMillis);
-			}
-		}
-
-		fail("The TaskManagers did not register within the expected time (" + maxDelayMillis + "msecs)");
-	}
-
 	protected static void printProcessLog(String processName, String log) {
 		if (log == null || log.length() == 0) {
 			return;
@@ -387,11 +298,11 @@ protected static boolean waitForMarkerFiles(File basedir, String prefix, int num
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * The entry point for the TaskManager JVM. Simply configures and runs a TaskManager.
+	 * The entry point for the TaskExecutor JVM. Simply configures and runs a TaskExecutor.
 	 */
-	public static class TaskManagerProcessEntryPoint {
+	public static class TaskExecutorProcessEntryPoint {
 
-		private static final Logger LOG = LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class);
+		private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorProcessEntryPoint.class);
 
 		public static void main(String[] args) {
 			try {
@@ -405,14 +316,7 @@ public static void main(String[] args) {
 				cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
 				cfg.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
 
-				TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg,
-					ResourceID.generate(), TaskManager.class);
-
-				// wait forever
-				Object lock = new Object();
-				synchronized (lock) {
-					lock.wait();
-				}
+				TaskManagerRunner.runTaskManager(cfg, ResourceID.generate());
 			}
 			catch (Throwable t) {
 				LOG.error("Failed to start TaskManager process", t);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index d3accffcbf8..9e9ce076197 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -22,36 +22,38 @@
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.leaderelection.TestingListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
-import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
-import org.apache.flink.runtime.testutils.JobManagerProcess;
+import org.apache.flink.runtime.testutils.DispatcherProcess;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
 import org.apache.commons.io.FileUtils;
 import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -59,12 +61,15 @@
 import org.junit.runners.Parameterized;
 
 import java.io.File;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import scala.Option;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -90,23 +95,28 @@
 @RunWith(Parameterized.class)
 public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 
-	private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
+	private static ZooKeeperTestEnvironment zooKeeper;
 
 	private static final FiniteDuration TestTimeOut = new FiniteDuration(5, TimeUnit.MINUTES);
 
 	@Rule
 	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
 
-	@AfterClass
-	public static void tearDown() throws Exception {
-		if (ZooKeeper != null) {
-			ZooKeeper.shutdown();
-		}
+	@BeforeClass
+	public static void setup() {
+		zooKeeper = new ZooKeeperTestEnvironment(1);
 	}
 
 	@Before
 	public void cleanUp() throws Exception {
-		ZooKeeper.deleteAll();
+		zooKeeper.deleteAll();
+	}
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		if (zooKeeper != null) {
+			zooKeeper.shutdown();
+		}
 	}
 
 	protected static final String READY_MARKER_FILE_PREFIX = "ready_";
@@ -141,7 +151,6 @@ public JobManagerHAProcessFailureBatchRecoveryITCase(ExecutionMode executionMode
 	 */
 	private void testJobManagerFailure(String zkQuorum, final File coordinateDir, final File zookeeperStoragePath) throws Exception {
 		Configuration config = new Configuration();
-		config.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE);
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
 		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkQuorum);
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, zookeeperStoragePath.getAbsolutePath());
@@ -149,7 +158,7 @@ private void testJobManagerFailure(String zkQuorum, final File coordinateDir, fi
 		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
 				"leader", 1, config);
 		env.setParallelism(PARALLELISM);
-		env.setNumberOfExecutionRetries(1);
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
 		env.getConfig().setExecutionMode(executionMode);
 		env.getConfig().disableSysoutLogging();
 
@@ -212,7 +221,8 @@ public void flatMap(Long value, Collector<Long> out) throws Exception {
 	}
 
 	@Test
-	public void testJobManagerProcessFailure() throws Exception {
+	public void testDispatcherProcessFailure() throws Exception {
+		final Time timeout = Time.seconds(30L);
 		final File zookeeperStoragePath = temporaryFolder.newFolder();
 
 		// Config
@@ -222,15 +232,11 @@ public void testJobManagerProcessFailure() throws Exception {
 
 		assertEquals(PARALLELISM, numberOfTaskManagers * numberOfSlotsPerTaskManager);
 
-		// Setup
-		// Test actor system
-		ActorSystem testActorSystem;
-
 		// Job managers
-		final JobManagerProcess[] jmProcess = new JobManagerProcess[numberOfJobManagers];
+		final DispatcherProcess[] dispatcherProcesses = new DispatcherProcess[numberOfJobManagers];
 
 		// Task managers
-		final ActorSystem[] tmActorSystem = new ActorSystem[numberOfTaskManagers];
+		TaskManagerRunner[] taskManagerRunners = new TaskManagerRunner[numberOfTaskManagers];
 
 		HighAvailabilityServices highAvailabilityServices = null;
 
@@ -239,24 +245,25 @@ public void testJobManagerProcessFailure() throws Exception {
 		// Coordination between the processes goes through a directory
 		File coordinateTempDir = null;
 
+		// Cluster config
+		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
+			zooKeeper.getConnectString(), zookeeperStoragePath.getPath());
+		// Task manager configuration
+		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
+		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
+		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
+
+		final RpcService rpcService = AkkaRpcServiceUtils.createRpcService("localhost", 0, config);
+
 		try {
 			final Deadline deadline = TestTimeOut.fromNow();
 
 			// Coordination directory
 			coordinateTempDir = temporaryFolder.newFolder();
 
-			// Job Managers
-			Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
-					ZooKeeper.getConnectString(), zookeeperStoragePath.getPath());
-
 			// Start first process
-			jmProcess[0] = new JobManagerProcess(0, config);
-			jmProcess[0].startProcess();
-
-			// Task manager configuration
-			config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
-			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
-			config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
+			dispatcherProcesses[0] = new DispatcherProcess(0, config);
+			dispatcherProcesses[0].startProcess();
 
 			highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
 				config,
@@ -264,27 +271,13 @@ public void testJobManagerProcessFailure() throws Exception {
 
 			// Start the task manager process
 			for (int i = 0; i < numberOfTaskManagers; i++) {
-				tmActorSystem[i] = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
-				TaskManager.startTaskManagerComponentsAndActor(
-					config,
-					ResourceID.generate(),
-					tmActorSystem[i],
-					highAvailabilityServices,
-					NoOpMetricRegistry.INSTANCE,
-					"localhost",
-					Option.<String>empty(),
-					false,
-					TaskManager.class);
+				taskManagerRunners[i] = new TaskManagerRunner(config, ResourceID.generate());
+				taskManagerRunners[i].start();
 			}
 
-			// Test actor system
-			testActorSystem = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
-
-			jmProcess[0].getActorRef(testActorSystem, deadline.timeLeft());
-
 			// Leader listener
 			TestingListener leaderListener = new TestingListener();
-			leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
+			leaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
 			leaderRetrievalService.start(leaderListener);
 
 			// Initial submission
@@ -293,13 +286,14 @@ public void testJobManagerProcessFailure() throws Exception {
 			String leaderAddress = leaderListener.getAddress();
 			UUID leaderId = leaderListener.getLeaderSessionID();
 
-			// Get the leader ref
-			ActorRef leaderRef = AkkaUtils.getActorRef(leaderAddress, testActorSystem, deadline.timeLeft());
-			ActorGateway leaderGateway = new AkkaActorGateway(leaderRef, leaderId);
+			final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = rpcService.connect(
+				leaderAddress,
+				DispatcherId.fromUuid(leaderId),
+				DispatcherGateway.class);
+			final DispatcherGateway dispatcherGateway = dispatcherGatewayFuture.get();
 
 			// Wait for all task managers to connect to the leading job manager
-			JobManagerActorTestUtils.waitForTaskManagers(numberOfTaskManagers, leaderGateway,
-					deadline.timeLeft());
+			waitForTaskManagers(numberOfTaskManagers, dispatcherGateway, deadline.timeLeft());
 
 			final File coordinateDirClosure = coordinateTempDir;
 			final Throwable[] errorRef = new Throwable[1];
@@ -309,7 +303,7 @@ public void testJobManagerProcessFailure() throws Exception {
 				@Override
 				public void run() {
 					try {
-						testJobManagerFailure(ZooKeeper.getConnectString(), coordinateDirClosure, zookeeperStoragePath);
+						testJobManagerFailure(zooKeeper.getConnectString(), coordinateDirClosure, zookeeperStoragePath);
 					}
 					catch (Throwable t) {
 						t.printStackTrace();
@@ -326,12 +320,10 @@ public void run() {
 					READY_MARKER_FILE_PREFIX, PARALLELISM, deadline.timeLeft().toMillis());
 
 			// Kill one of the job managers and trigger recovery
-			jmProcess[0].destroy();
+			dispatcherProcesses[0].destroy();
 
-			jmProcess[1] = new JobManagerProcess(1, config);
-			jmProcess[1].startProcess();
-
-			jmProcess[1].getActorRef(testActorSystem, deadline.timeLeft());
+			dispatcherProcesses[1] = new DispatcherProcess(1, config);
+			dispatcherProcesses[1].startProcess();
 
 			// we create the marker file which signals the program functions tasks that they can complete
 			AbstractTaskManagerProcessFailureRecoveryTest.touchFile(new File(coordinateTempDir, PROCEED_MARKER_FILE));
@@ -358,7 +350,7 @@ public void run() {
 			// for Travis and the root problem is not shown)
 			t.printStackTrace();
 
-			for (JobManagerProcess p : jmProcess) {
+			for (DispatcherProcess p : dispatcherProcesses) {
 				if (p != null) {
 					p.printProcessLog();
 				}
@@ -368,8 +360,8 @@ public void run() {
 		}
 		finally {
 			for (int i = 0; i < numberOfTaskManagers; i++) {
-				if (tmActorSystem[i] != null) {
-					tmActorSystem[i].shutdown();
+				if (taskManagerRunners[i] != null) {
+					taskManagerRunners[i].close();
 				}
 			}
 
@@ -377,7 +369,7 @@ public void run() {
 				leaderRetrievalService.stop();
 			}
 
-			for (JobManagerProcess jmProces : jmProcess) {
+			for (DispatcherProcess jmProces : dispatcherProcesses) {
 				if (jmProces != null) {
 					jmProces.destroy();
 				}
@@ -387,6 +379,8 @@ public void run() {
 				highAvailabilityServices.closeAndCleanupAllData();
 			}
 
+			RpcUtils.terminateRpcService(rpcService, timeout);
+
 			// Delete coordination directory
 			if (coordinateTempDir != null) {
 				try {
@@ -398,4 +392,14 @@ public void run() {
 		}
 	}
 
+	private void waitForTaskManagers(int numberOfTaskManagers, DispatcherGateway dispatcherGateway, FiniteDuration timeLeft) throws ExecutionException, InterruptedException {
+		FutureUtils.retrySuccesfulWithDelay(
+			() -> dispatcherGateway.requestClusterOverview(Time.milliseconds(timeLeft.toMillis())),
+			Time.milliseconds(50L),
+			org.apache.flink.api.common.time.Deadline.fromNow(Duration.ofMillis(timeLeft.toMillis())),
+			clusterOverview -> clusterOverview.getNumTaskManagersConnected() >= numberOfTaskManagers,
+			new ScheduledExecutorServiceAdapter(Executors.newSingleThreadScheduledExecutor()))
+			.get();
+	}
+
 }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index b85a410e3a6..4f8a1a2e828 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -21,51 +21,55 @@
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
+import org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.util.NetUtils;
+import org.apache.flink.runtime.util.BlobServerResource;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.CheckedSupplier;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
 import java.io.StringWriter;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
 import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
+import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * This test makes sure that jobs are canceled properly in cases where
@@ -74,15 +78,36 @@
 @SuppressWarnings("serial")
 public class ProcessFailureCancelingITCase extends TestLogger {
 
+	@Rule
+	public final BlobServerResource blobServerResource = new BlobServerResource();
+
 	@Test
 	public void testCancelingOnProcessFailure() throws Exception {
 		final StringWriter processOutput = new StringWriter();
+		final Time timeout = Time.minutes(2L);
 
-		ActorSystem jmActorSystem = null;
+		RestClusterClient<String> clusterClient = null;
 		Process taskManagerProcess = null;
-		HighAvailabilityServices highAvailabilityServices = null;
+		final TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
+
+		Configuration jmConfig = new Configuration();
+		jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
+		jmConfig.setString(JobManagerOptions.ADDRESS, "localhost");
+		jmConfig.setInteger(RestOptions.PORT, 0);
+
+		final RpcService rpcService = AkkaRpcServiceUtils.createRpcService("localhost", 0, jmConfig);
+		final int jobManagerPort = rpcService.getPort();
+		jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
+
+		final SessionDispatcherResourceManagerComponentFactory resourceManagerComponentFactory = new SessionDispatcherResourceManagerComponentFactory(
+			StandaloneResourceManagerFactory.INSTANCE);
+		DispatcherResourceManagerComponent<?> dispatcherResourceManagerComponent = null;
+
+		try (final HighAvailabilityServices haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+				jmConfig,
+				TestingUtils.defaultExecutor(),
+				HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION)) {
 
-		try {
 			// check that we run this test only if the java command
 			// is available on this machine
 			String javaCommand = getJavaCommandPath();
@@ -96,36 +121,22 @@ public void testCancelingOnProcessFailure() throws Exception {
 			tempLogFile.deleteOnExit();
 			CommonTestUtils.printLog4jDebugConfig(tempLogFile);
 
-			// find a free port to start the JobManager
-			final int jobManagerPort = NetUtils.getAvailablePort();
-
-			// start a JobManager
-			Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
-
-			Configuration jmConfig = new Configuration();
-			jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "5 s");
-			jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "2000 s");
-			jmConfig.setInteger(AkkaOptions.WATCH_THRESHOLD, 10);
-			jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
-			jmConfig.setString(JobManagerOptions.ADDRESS, localAddress._1());
-			jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
-
-			highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
-				jmConfig,
-				TestingUtils.defaultExecutor(),
-				HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
-
-			jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<>(localAddress));
-			ActorRef jmActor = JobManager.startJobManagerActors(
+			dispatcherResourceManagerComponent = resourceManagerComponentFactory.create(
 				jmConfig,
-				jmActorSystem,
-				TestingUtils.defaultExecutor(),
-				TestingUtils.defaultExecutor(),
-				highAvailabilityServices,
+				rpcService,
+				haServices,
+				blobServerResource.getBlobServer(),
+				new HeartbeatServices(100L, 1000L),
 				NoOpMetricRegistry.INSTANCE,
-				Option.empty(),
-				JobManager.class,
-				MemoryArchivist.class)._1();
+				new MemoryArchivedExecutionGraphStore(),
+				fatalErrorHandler);
+
+			// update the rest ports
+			final int restPort = dispatcherResourceManagerComponent
+				.getWebMonitorEndpoint()
+				.getServerAddress()
+				.getPort();
+			jmConfig.setInteger(RestOptions.PORT, restPort);
 
 			// the TaskManager java command
 			String[] command = new String[] {
@@ -134,7 +145,7 @@ public void testCancelingOnProcessFailure() throws Exception {
 					"-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
 					"-Xms80m", "-Xmx80m",
 					"-classpath", getCurrentClasspath(),
-					AbstractTaskManagerProcessFailureRecoveryTest.TaskManagerProcessEntryPoint.class.getName(),
+					AbstractTaskManagerProcessFailureRecoveryTest.TaskExecutorProcessEntryPoint.class.getName(),
 					String.valueOf(jobManagerPort)
 			};
 
@@ -142,21 +153,14 @@ public void testCancelingOnProcessFailure() throws Exception {
 			taskManagerProcess = new ProcessBuilder(command).start();
 			new CommonTestUtils.PipeForwarder(taskManagerProcess.getErrorStream(), processOutput);
 
-			// we wait for the JobManager to have the two TaskManagers available
-			// since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes)
-			waitUntilNumTaskManagersAreRegistered(jmActor, 1, 120000);
-
 			final Throwable[] errorRef = new Throwable[1];
 
-			final Configuration configuration = new Configuration();
-			configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE);
-
 			// start the test program, which infinitely blocks
 			Runnable programRunner = new Runnable() {
 				@Override
 				public void run() {
 					try {
-						ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort, configuration);
+						ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", restPort, new Configuration());
 						env.setParallelism(2);
 						env.setRestartStrategy(RestartStrategies.noRestart());
 						env.getConfig().disableSysoutLogging();
@@ -187,15 +191,26 @@ public Long map(Long value) throws Exception {
 			Thread programThread = new Thread(programRunner);
 
 			// kill the TaskManager
+			programThread.start();
+
+			final DispatcherGateway dispatcherGateway = dispatcherResourceManagerComponent
+				.getDispatcher()
+				.getSelfGateway(DispatcherGateway.class);
+			waitUntilAllSlotsAreUsed(dispatcherGateway, timeout);
+
+			clusterClient = new RestClusterClient<>(jmConfig, "standalone");
+
+			final Collection<JobID> jobIds = waitForRunningJobs(clusterClient, timeout);
+
+			assertThat(jobIds, hasSize(1));
+			final JobID jobId = jobIds.iterator().next();
+
+			// kill the TaskManager after the job started to run
 			taskManagerProcess.destroy();
 			taskManagerProcess = null;
 
-			// immediately submit the job. this should hit the case
-			// where the JobManager still thinks it has the TaskManager and tries to send it tasks
-			programThread.start();
-
 			// try to cancel the job
-			cancelRunningJob(jmActor);
+			clusterClient.cancel(jobId);
 
 			// we should see a failure within reasonable time (10s is the ask timeout).
 			// since the CI environment is often slow, we conservatively give it up to 2 minutes,
@@ -223,88 +238,42 @@ public Long map(Long value) throws Exception {
 			if (taskManagerProcess != null) {
 				taskManagerProcess.destroy();
 			}
-			if (jmActorSystem != null) {
-				jmActorSystem.shutdown();
+			if (clusterClient != null) {
+				clusterClient.shutdown();
 			}
-
-			if (highAvailabilityServices != null) {
-				highAvailabilityServices.closeAndCleanupAllData();
+			if (dispatcherResourceManagerComponent != null) {
+				dispatcherResourceManagerComponent.close();
 			}
-		}
-	}
-
-	private void cancelRunningJob(ActorRef jobManager) throws Exception {
-		final FiniteDuration askTimeout = new FiniteDuration(10, TimeUnit.SECONDS);
-
-		// try at most for 30 seconds
-		final long deadline = System.currentTimeMillis() + 30000;
-
-		JobID jobId = null;
-
-		do {
-			Future<Object> response = Patterns.ask(jobManager,
-					JobManagerMessages.getRequestRunningJobsStatus(), new Timeout(askTimeout));
-
-			Object result;
-			try {
-				result = Await.result(response, askTimeout);
-			}
-			catch (Exception e) {
-				throw new Exception("Could not retrieve running jobs from the JobManager.", e);
-			}
-
-			if (result instanceof JobManagerMessages.RunningJobsStatus) {
-
-				List<JobStatusMessage> jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
 
-				if (jobs.size() == 1) {
-					jobId = jobs.get(0).getJobId();
-					break;
-				}
-			}
-		}
-		while (System.currentTimeMillis() < deadline);
+			fatalErrorHandler.rethrowError();
 
-		if (jobId == null) {
-			// we never found it running, must have failed already
-			return;
+			RpcUtils.terminateRpcService(rpcService, Time.seconds(10L));
 		}
-
-		// tell the JobManager to cancel the job
-		jobManager.tell(
-			new JobManagerMessages.LeaderSessionMessage(
-				HighAvailabilityServices.DEFAULT_LEADER_ID,
-				new JobManagerMessages.CancelJob(jobId)),
-			ActorRef.noSender());
 	}
 
-	private void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelay)
-			throws Exception {
-		final long deadline = System.currentTimeMillis() + maxDelay;
-		while (true) {
-			long remaining = deadline - System.currentTimeMillis();
-			if (remaining <= 0) {
-				fail("The TaskManagers did not register within the expected time (" + maxDelay + "msecs)");
-			}
-
-			FiniteDuration timeout = new FiniteDuration(remaining, TimeUnit.MILLISECONDS);
+	private void waitUntilAllSlotsAreUsed(DispatcherGateway dispatcherGateway, Time timeout) throws ExecutionException, InterruptedException {
+		FutureUtils.retrySuccesfulWithDelay(
+			() -> dispatcherGateway.requestClusterOverview(timeout),
+			Time.milliseconds(50L),
+			Deadline.fromNow(Duration.ofMillis(timeout.toMilliseconds())),
+			clusterOverview -> clusterOverview.getNumTaskManagersConnected() >= 1 &&
+				clusterOverview.getNumSlotsAvailable() == 0 &&
+				clusterOverview.getNumSlotsTotal() == 2,
+			TestingUtils.defaultScheduledExecutor())
+			.get();
+	}
 
-			try {
-				Future<?> result = Patterns.ask(jobManager,
-						JobManagerMessages.getRequestNumberRegisteredTaskManager(),
-						new Timeout(timeout));
-				Integer numTMs = (Integer) Await.result(result, timeout);
-				if (numTMs == numExpected) {
-					break;
-				}
-			}
-			catch (TimeoutException e) {
-				// ignore and retry
-			}
-			catch (ClassCastException e) {
-				fail("Wrong response: " + e.getMessage());
-			}
-		}
+	private Collection<JobID> waitForRunningJobs(ClusterClient<?> clusterClient, Time timeout) throws ExecutionException, InterruptedException {
+		return FutureUtils.retrySuccesfulWithDelay(
+				CheckedSupplier.unchecked(clusterClient::listJobs),
+				Time.milliseconds(50L),
+				Deadline.fromNow(Duration.ofMillis(timeout.toMilliseconds())),
+				jobs -> !jobs.isEmpty(),
+				TestingUtils.defaultScheduledExecutor())
+			.get()
+			.stream()
+			.map(JobStatusMessage::getJobId)
+			.collect(Collectors.toList());
 	}
 
 	private void printProcessLog(String processName, String log) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
index 7dc6f0cf1d6..4815c4938f7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
@@ -25,7 +25,6 @@
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -68,10 +67,9 @@ public TaskManagerProcessFailureBatchRecoveryITCase(ExecutionMode executionMode)
 	public void testTaskManagerFailure(int jobManagerPort, final File coordinateDir) throws Exception {
 
 		final Configuration configuration = new Configuration();
-		configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE);
 		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort, configuration);
 		env.setParallelism(PARALLELISM);
-		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 10000));
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
 		env.getConfig().setExecutionMode(executionMode);
 		env.getConfig().disableSysoutLogging();
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
index 766a7993c45..fbf6b5b71e3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
@@ -23,7 +23,6 @@
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -67,7 +66,6 @@ public void testTaskManagerFailure(int jobManagerPort, final File coordinateDir)
 		final File tempCheckpointDir = tempFolder.newFolder();
 
 		final Configuration configuration = new Configuration();
-		configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE);
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
 			"localhost",
 			jobManagerPort,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Port JobManagerHAProcessFailureBatchRecoveryITCase to new code base
> -------------------------------------------------------------------
>
>                 Key: FLINK-10403
>                 URL: https://issues.apache.org/jira/browse/FLINK-10403
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Tests
>    Affects Versions: 1.7.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> Port {{JobManagerHAProcessFailureBatchRecoveryITCase}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message