flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] asfgit closed pull request #6743: [FLINK-10411] Make ClusterEntrypoint more compositional
Date Fri, 28 Sep 2018 09:44:58 GMT
asfgit closed pull request #6743: [FLINK-10411] Make ClusterEntrypoint more compositional
URL: https://github.com/apache/flink/pull/6743
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
new file mode 100644
index 00000000000..3e0645d6859
--- /dev/null
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.container.entrypoint;
+
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.entrypoint.component.JobGraphRetriever;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.util.FlinkException;
+
+import javax.annotation.Nonnull;
+
+/**
+ * {@link JobGraphRetriever} which creates the {@link JobGraph} from a class
+ * on the class path.
+ */
+public class ClassPathJobGraphRetriever implements JobGraphRetriever {
+
+	@Nonnull
+	private final String jobClassName;
+
+	@Nonnull
+	private final SavepointRestoreSettings savepointRestoreSettings;
+
+	@Nonnull
+	private final String[] programArguments;
+
+	public ClassPathJobGraphRetriever(
+			@Nonnull String jobClassName,
+			@Nonnull SavepointRestoreSettings savepointRestoreSettings,
+			@Nonnull String[] programArguments) {
+		this.jobClassName = jobClassName;
+		this.savepointRestoreSettings = savepointRestoreSettings;
+		this.programArguments = programArguments;
+	}
+
+	@Override
+	public JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException {
+		final PackagedProgram packagedProgram = createPackagedProgram();
+		final int defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
+		try {
+			final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, configuration, defaultParallelism);
+			jobGraph.setAllowQueuedScheduling(true);
+			jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);
+
+			return jobGraph;
+		} catch (Exception e) {
+			throw new FlinkException("Could not create the JobGraph from the provided user code jar.", e);
+		}
+	}
+
+	private PackagedProgram createPackagedProgram() throws FlinkException {
+		try {
+			final Class<?> mainClass = getClass().getClassLoader().loadClass(jobClassName);
+			return new PackagedProgram(mainClass, programArguments);
+		} catch (ClassNotFoundException | ProgramInvocationException e) {
+			throw new FlinkException("Could not load the provided entrypoint class.", e);
+		}
+	}
+}
diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
index 0e400950b5c..6c42bf23a22 100644
--- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
@@ -18,38 +18,20 @@
 
 package org.apache.flink.container.entrypoint;
 
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.client.program.PackagedProgramUtils;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.entrypoint.FlinkParseException;
 import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
+import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.JobDispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
-import org.apache.flink.util.FlinkException;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -79,62 +61,10 @@
 	}
 
 	@Override
-	protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException {
-		final PackagedProgram packagedProgram = createPackagedProgram();
-		final int defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
-		try {
-			final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, configuration, defaultParallelism);
-			jobGraph.setAllowQueuedScheduling(true);
-			jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);
-
-			return jobGraph;
-		} catch (Exception e) {
-			throw new FlinkException("Could not create the JobGraph from the provided user code jar.", e);
-		}
-	}
-
-	private PackagedProgram createPackagedProgram() throws FlinkException {
-		try {
-			final Class<?> mainClass = getClass().getClassLoader().loadClass(jobClassName);
-			return new PackagedProgram(mainClass, programArguments);
-		} catch (ClassNotFoundException | ProgramInvocationException e) {
-			throw new FlinkException("Could not load the provided entrypoint class.", e);
-		}
-	}
-
-	@Override
-	protected void registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture) {
-		terminationFuture.thenAccept((status) -> shutDownAndTerminate(0, ApplicationStatus.SUCCEEDED, null, true));
-	}
-
-	@Override
-	protected ResourceManager<?> createResourceManager(
-			Configuration configuration,
-			ResourceID resourceId,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry,
-			FatalErrorHandler fatalErrorHandler,
-			ClusterInformation clusterInformation,
-			@Nullable String webInterfaceUrl) throws Exception {
-		final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-		final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
-			resourceManagerRuntimeServicesConfiguration,
-			highAvailabilityServices,
-			rpcService.getScheduledExecutor());
-
-		return new StandaloneResourceManager(
-			rpcService,
-			ResourceManager.RESOURCE_MANAGER_NAME,
-			resourceId,
-			highAvailabilityServices,
-			heartbeatServices,
-			resourceManagerRuntimeServices.getSlotManager(),
-			metricRegistry,
-			resourceManagerRuntimeServices.getJobLeaderIdService(),
-			clusterInformation,
-			fatalErrorHandler);
+	protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
+		return new JobDispatcherResourceManagerComponentFactory(
+			StandaloneResourceManagerFactory.INSTANCE,
+			new ClassPathJobGraphRetriever(jobClassName, savepointRestoreSettings, programArguments));
 	}
 
 	public static void main(String[] args) {
@@ -164,7 +94,6 @@ public static void main(String[] args) {
 			clusterConfiguration.getSavepointRestoreSettings(),
 			clusterConfiguration.getArgs());
 
-		entrypoint.startCluster();
+		ClusterEntrypoint.runClusterEntrypoint(entrypoint);
 	}
-
 }
diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java
similarity index 81%
rename from flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
rename to flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java
index 2faec4c6627..6e460e1f894 100644
--- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
+++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java
@@ -32,24 +32,24 @@
 import static org.junit.Assert.assertThat;
 
 /**
- * Tests for the {@link StandaloneJobClusterEntryPoint}.
+ * Tests for the {@link ClassPathJobGraphRetriever}.
  */
-public class StandaloneJobClusterEntryPointTest extends TestLogger {
+public class ClassPathJobGraphRetrieverTest extends TestLogger {
 
 	public static final String[] PROGRAM_ARGUMENTS = {"--arg", "suffix"};
 
 	@Test
 	public void testJobGraphRetrieval() throws FlinkException {
-		final Configuration configuration = new Configuration();
 		final int parallelism = 42;
+		final Configuration configuration = new Configuration();
 		configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, parallelism);
-		final StandaloneJobClusterEntryPoint standaloneJobClusterEntryPoint = new StandaloneJobClusterEntryPoint(
-			configuration,
+
+		final ClassPathJobGraphRetriever classPathJobGraphRetriever = new ClassPathJobGraphRetriever(
 			TestJob.class.getCanonicalName(),
 			SavepointRestoreSettings.none(),
 			PROGRAM_ARGUMENTS);
 
-		final JobGraph jobGraph = standaloneJobClusterEntryPoint.retrieveJobGraph(configuration);
+		final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(configuration);
 
 		assertThat(jobGraph.getName(), is(equalTo(TestJob.class.getCanonicalName() + "-suffix")));
 		assertThat(jobGraph.getMaximumParallelism(), is(parallelism));
@@ -59,13 +59,13 @@ public void testJobGraphRetrieval() throws FlinkException {
 	public void testSavepointRestoreSettings() throws FlinkException {
 		final Configuration configuration = new Configuration();
 		final SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath("foobar", true);
-		final StandaloneJobClusterEntryPoint jobClusterEntryPoint = new StandaloneJobClusterEntryPoint(
-			configuration,
+
+		final ClassPathJobGraphRetriever classPathJobGraphRetriever = new ClassPathJobGraphRetriever(
 			TestJob.class.getCanonicalName(),
 			savepointRestoreSettings,
 			PROGRAM_ARGUMENTS);
 
-		final JobGraph jobGraph = jobClusterEntryPoint.retrieveJobGraph(configuration);
+		final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(configuration);
 
 		assertThat(jobGraph.getSavepointRestoreSettings(), is(equalTo(savepointRestoreSettings)));
 	}
diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
index ada434dd8b7..27d1f321d0b 100644
--- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
+++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
@@ -25,7 +25,7 @@
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 
 /**
- * Test job which is used for {@link StandaloneJobClusterEntryPointTest}.
+ * Test job which is used for {@link ClassPathJobGraphRetrieverTest}.
  */
 public class TestJob {
 
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
index 38b61d86679..377b5b5971a 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
@@ -20,31 +20,22 @@
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
+import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManagerFactory;
 import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
 import org.apache.flink.mesos.util.MesosConfiguration;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
+import org.apache.flink.runtime.entrypoint.component.JobDispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.commons.cli.CommandLine;
@@ -52,13 +43,6 @@
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 
-import javax.annotation.Nullable;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -66,8 +50,6 @@
  */
 public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
 
-	public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
-
 	// ------------------------------------------------------------------------
 	//  Command-line options
 	// ------------------------------------------------------------------------
@@ -113,58 +95,6 @@ protected void initializeServices(Configuration config) throws Exception {
 		taskManagerContainerSpec = MesosEntrypointUtils.createContainerSpec(config, dynamicProperties);
 	}
 
-	@Override
-	protected ResourceManager<?> createResourceManager(
-			Configuration configuration,
-			ResourceID resourceId,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry,
-			FatalErrorHandler fatalErrorHandler,
-			ClusterInformation clusterInformation,
-			@Nullable String webInterfaceUrl) throws Exception {
-		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-		final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
-			rmServicesConfiguration,
-			highAvailabilityServices,
-			rpcService.getScheduledExecutor());
-
-		return new MesosResourceManager(
-			rpcService,
-			ResourceManager.RESOURCE_MANAGER_NAME,
-			resourceId,
-			highAvailabilityServices,
-			heartbeatServices,
-			rmRuntimeServices.getSlotManager(),
-			metricRegistry,
-			rmRuntimeServices.getJobLeaderIdService(),
-			clusterInformation,
-			fatalErrorHandler,
-			configuration,
-			mesosServices,
-			schedulerConfiguration,
-			taskManagerParameters,
-			taskManagerContainerSpec,
-			webInterfaceUrl);
-	}
-
-	@Override
-	protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException {
-		String jobGraphFile = configuration.getString(JOB_GRAPH_FILE_PATH, "job.graph");
-		File fp = new File(jobGraphFile);
-
-		try (FileInputStream input = new FileInputStream(fp);
-			ObjectInputStream obInput = new ObjectInputStream(input)) {
-
-			return (JobGraph) obInput.readObject();
-		} catch (FileNotFoundException e) {
-			throw new FlinkException("Could not find the JobGraph file.", e);
-		} catch (ClassNotFoundException | IOException e) {
-			throw new FlinkException("Could not load the JobGraph from file.", e);
-		}
-	}
-
 	@Override
 	protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
 		final CompletableFuture<Void> serviceShutDownFuture = super.stopClusterServices(cleanupHaData);
@@ -179,7 +109,15 @@ protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkExc
 	}
 
 	@Override
-	protected void registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture) {}
+	protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
+		return new JobDispatcherResourceManagerComponentFactory(
+			new MesosResourceManagerFactory(
+				mesosServices,
+				schedulerConfiguration,
+				taskManagerParameters,
+				taskManagerContainerSpec),
+			FileJobGraphRetriever.createFrom(configuration));
+	}
 
 	public static void main(String[] args) {
 		// startup checks and logging
@@ -204,6 +142,6 @@ public static void main(String[] args) {
 
 		MesosJobClusterEntrypoint clusterEntrypoint = new MesosJobClusterEntrypoint(configuration, dynamicProperties);
 
-		clusterEntrypoint.startCluster();
+		ClusterEntrypoint.runClusterEntrypoint(clusterEntrypoint);
 	}
 }
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
index 70369f61f2a..98796282cf2 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
@@ -20,25 +20,18 @@
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
+import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManagerFactory;
 import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
 import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
@@ -49,8 +42,6 @@
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 
-import javax.annotation.Nullable;
-
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -103,42 +94,6 @@ protected void initializeServices(Configuration config) throws Exception {
 		taskManagerContainerSpec = MesosEntrypointUtils.createContainerSpec(config, dynamicProperties);
 	}
 
-	@Override
-	protected ResourceManager<?> createResourceManager(
-			Configuration configuration,
-			ResourceID resourceId,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry,
-			FatalErrorHandler fatalErrorHandler,
-			ClusterInformation clusterInformation,
-			@Nullable String webInterfaceUrl) throws Exception {
-		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-		final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
-			rmServicesConfiguration,
-			highAvailabilityServices,
-			rpcService.getScheduledExecutor());
-
-		return new MesosResourceManager(
-			rpcService,
-			ResourceManager.RESOURCE_MANAGER_NAME,
-			resourceId,
-			highAvailabilityServices,
-			heartbeatServices,
-			rmRuntimeServices.getSlotManager(),
-			metricRegistry,
-			rmRuntimeServices.getJobLeaderIdService(),
-			clusterInformation,
-			fatalErrorHandler,
-			configuration,
-			mesosServices,
-			mesosConfig,
-			taskManagerParameters,
-			taskManagerContainerSpec,
-			webInterfaceUrl);
-	}
-
 	@Override
 	protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
 		final CompletableFuture<Void> serviceShutDownFuture = super.stopClusterServices(cleanupHaData);
@@ -152,6 +107,16 @@ protected void initializeServices(Configuration config) throws Exception {
 			});
 	}
 
+	@Override
+	protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
+		return new SessionDispatcherResourceManagerComponentFactory(
+			new MesosResourceManagerFactory(
+				mesosServices,
+				mesosConfig,
+				taskManagerParameters,
+				taskManagerContainerSpec));
+	}
+
 	public static void main(String[] args) {
 		// startup checks and logging
 		EnvironmentInformation.logEnvironmentInfo(LOG, MesosSessionClusterEntrypoint.class.getSimpleName(), args);
@@ -175,7 +140,7 @@ public static void main(String[] args) {
 
 		MesosSessionClusterEntrypoint clusterEntrypoint = new MesosSessionClusterEntrypoint(configuration, dynamicProperties);
 
-		clusterEntrypoint.startCluster();
+		ClusterEntrypoint.runClusterEntrypoint(clusterEntrypoint);
 	}
 
 }
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
new file mode 100644
index 00000000000..9582e9f2e23
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/**
+ * {@link ResourceManagerFactory} which creates a {@link MesosResourceManager}.
+ */
+public class MesosResourceManagerFactory implements ResourceManagerFactory<RegisteredMesosWorkerNode> {
+
+	@Nonnull
+	private final MesosServices mesosServices;
+
+	@Nonnull
+	private final MesosConfiguration schedulerConfiguration;
+
+	@Nonnull
+	private final MesosTaskManagerParameters taskManagerParameters;
+
+	@Nonnull
+	private final ContainerSpecification taskManagerContainerSpec;
+
+	public MesosResourceManagerFactory(@Nonnull MesosServices mesosServices, @Nonnull MesosConfiguration schedulerConfiguration, @Nonnull MesosTaskManagerParameters taskManagerParameters, @Nonnull ContainerSpecification taskManagerContainerSpec) {
+		this.mesosServices = mesosServices;
+		this.schedulerConfiguration = schedulerConfiguration;
+		this.taskManagerParameters = taskManagerParameters;
+		this.taskManagerContainerSpec = taskManagerContainerSpec;
+	}
+
+	@Override
+	public ResourceManager<RegisteredMesosWorkerNode> createResourceManager(Configuration configuration, ResourceID resourceId, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, @Nullable String webInterfaceUrl) throws Exception {
+		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
+		final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
+			rmServicesConfiguration,
+			highAvailabilityServices,
+			rpcService.getScheduledExecutor());
+
+		return new MesosResourceManager(
+			rpcService,
+			ResourceManager.RESOURCE_MANAGER_NAME,
+			resourceId,
+			highAvailabilityServices,
+			heartbeatServices,
+			rmRuntimeServices.getSlotManager(),
+			metricRegistry,
+			rmRuntimeServices.getJobLeaderIdService(),
+			clusterInformation,
+			fatalErrorHandler,
+			configuration,
+			mesosServices,
+			schedulerConfiguration,
+			taskManagerParameters,
+			taskManagerContainerSpec,
+			webInterfaceUrl);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
new file mode 100644
index 00000000000..59522998690
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link Dispatcher} factory interface.
+ */
+public interface DispatcherFactory<T extends Dispatcher> {
+
+	/**
+	 * Create a {@link Dispatcher} of the given type {@link T}.
+	 */
+	T createDispatcher(
+		Configuration configuration,
+		RpcService rpcService,
+		HighAvailabilityServices highAvailabilityServices,
+		ResourceManagerGateway resourceManagerGateway,
+		BlobServer blobServer,
+		HeartbeatServices heartbeatServices,
+		JobManagerMetricGroup jobManagerMetricGroup,
+		@Nullable String metricQueryServicePath,
+		ArchivedExecutionGraphStore archivedExecutionGraphStore,
+		FatalErrorHandler fatalErrorHandler,
+		@Nullable String restAddress,
+		HistoryServerArchivist historyServerArchivist) throws Exception;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
new file mode 100644
index 00000000000..e6b1a26e99c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
+import org.apache.flink.runtime.entrypoint.component.JobGraphRetriever;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.runtime.entrypoint.ClusterEntrypoint.EXECUTION_MODE;
+
+/**
+ * {@link DispatcherFactory} which creates a {@link MiniDispatcher}.
+ */
+public class JobDispatcherFactory implements DispatcherFactory<MiniDispatcher> {
+
+	private final JobGraphRetriever jobGraphRetriever;
+
+	public JobDispatcherFactory(JobGraphRetriever jobGraphRetriever) {
+		this.jobGraphRetriever = jobGraphRetriever;
+	}
+
+	@Override
+	public MiniDispatcher createDispatcher(
+			Configuration configuration,
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			ResourceManagerGateway resourceManagerGateway,
+			BlobServer blobServer,
+			HeartbeatServices heartbeatServices,
+			JobManagerMetricGroup jobManagerMetricGroup,
+			@Nullable String metricQueryServicePath,
+			ArchivedExecutionGraphStore archivedExecutionGraphStore,
+			FatalErrorHandler fatalErrorHandler,
+			@Nullable String restAddress,
+			HistoryServerArchivist historyServerArchivist) throws Exception {
+		final JobGraph jobGraph = jobGraphRetriever.retrieveJobGraph(configuration);
+
+		final String executionModeValue = configuration.getString(EXECUTION_MODE);
+
+		final ClusterEntrypoint.ExecutionMode executionMode = ClusterEntrypoint.ExecutionMode.valueOf(executionModeValue);
+
+		return new MiniDispatcher(
+			rpcService,
+			Dispatcher.DISPATCHER_NAME,
+			configuration,
+			highAvailabilityServices,
+			resourceManagerGateway,
+			blobServer,
+			heartbeatServices,
+			jobManagerMetricGroup,
+			metricQueryServicePath,
+			archivedExecutionGraphStore,
+			Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
+			fatalErrorHandler,
+			restAddress,
+			historyServerArchivist,
+			jobGraph,
+			executionMode);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
new file mode 100644
index 00000000000..18e29a09711
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link DispatcherFactory} which creates a {@link StandaloneDispatcher}.
+ */
+public enum SessionDispatcherFactory implements DispatcherFactory<Dispatcher> {
+	INSTANCE;
+
+	@Override
+	public Dispatcher createDispatcher(
+				Configuration configuration,
+				RpcService rpcService,
+				HighAvailabilityServices highAvailabilityServices,
+				ResourceManagerGateway resourceManagerGateway,
+				BlobServer blobServer,
+				HeartbeatServices heartbeatServices,
+				JobManagerMetricGroup jobManagerMetricGroup,
+				@Nullable String metricQueryServicePath,
+				ArchivedExecutionGraphStore archivedExecutionGraphStore,
+				FatalErrorHandler fatalErrorHandler,
+				@Nullable String restAddress,
+				HistoryServerArchivist historyServerArchivist) throws Exception {
+		// create the default dispatcher
+		return new StandaloneDispatcher(
+			rpcService,
+			Dispatcher.DISPATCHER_NAME,
+			configuration,
+			highAvailabilityServices,
+			resourceManagerGateway,
+			blobServer,
+			heartbeatServices,
+			jobManagerMetricGroup,
+			metricQueryServicePath,
+			archivedExecutionGraphStore,
+			Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
+			fatalErrorHandler,
+			restAddress,
+			historyServerArchivist);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 1a8c0581192..c9a17227f79 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -32,32 +32,21 @@
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.TransientBlobCache;
-import org.apache.flink.runtime.blob.TransientBlobService;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.Dispatcher;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.dispatcher.DispatcherId;
-import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
 import org.apache.flink.runtime.dispatcher.MiniDispatcher;
+import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
+import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
-import org.apache.flink.runtime.metrics.util.MetricUtils;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
@@ -65,11 +54,6 @@
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
-import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
-import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
@@ -91,7 +75,10 @@
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import scala.concurrent.duration.FiniteDuration;
@@ -109,21 +96,23 @@
 
 	protected static final Logger LOG = LoggerFactory.getLogger(ClusterEntrypoint.class);
 
-	protected static final int SUCCESS_RETURN_CODE = 0;
 	protected static final int STARTUP_FAILURE_RETURN_CODE = 1;
 	protected static final int RUNTIME_FAILURE_RETURN_CODE = 2;
 
+	private static final Time INITIALIZATION_SHUTDOWN_TIMEOUT = Time.seconds(30L);
+
 	/** The lock to guard startup / shutdown / manipulation methods. */
 	private final Object lock = new Object();
 
 	private final Configuration configuration;
 
-	private final CompletableFuture<Void> terminationFuture;
-
-	private final AtomicBoolean isTerminating = new AtomicBoolean(false);
+	private final CompletableFuture<ApplicationStatus> terminationFuture;
 
 	private final AtomicBoolean isShutDown = new AtomicBoolean(false);
 
+	@GuardedBy("lock")
+	private DispatcherResourceManagerComponent<?> clusterComponent;
+
 	@GuardedBy("lock")
 	private MetricRegistryImpl metricRegistry;
 
@@ -139,33 +128,12 @@
 	@GuardedBy("lock")
 	private RpcService commonRpcService;
 
-	@GuardedBy("lock")
-	private ResourceManager<?> resourceManager;
-
-	@GuardedBy("lock")
-	private Dispatcher dispatcher;
-
-	@GuardedBy("lock")
-	private LeaderRetrievalService dispatcherLeaderRetrievalService;
-
-	@GuardedBy("lock")
-	private LeaderRetrievalService resourceManagerRetrievalService;
-
-	@GuardedBy("lock")
-	private WebMonitorEndpoint<?> webMonitorEndpoint;
-
 	@GuardedBy("lock")
 	private ArchivedExecutionGraphStore archivedExecutionGraphStore;
 
 	@GuardedBy("lock")
 	private TransientBlobCache transientBlobCache;
 
-	@GuardedBy("lock")
-	private ClusterInformation clusterInformation;
-
-	@GuardedBy("lock")
-	private JobManagerMetricGroup jobManagerMetricGroup;
-
 	private final Thread shutDownHook;
 
 	protected ClusterEntrypoint(Configuration configuration) {
@@ -175,11 +143,11 @@ protected ClusterEntrypoint(Configuration configuration) {
 		shutDownHook = ShutdownHookUtil.addShutdownHook(this::cleanupDirectories, getClass().getSimpleName(), LOG);
 	}
 
-	public CompletableFuture<Void> getTerminationFuture() {
+	public CompletableFuture<ApplicationStatus> getTerminationFuture() {
 		return terminationFuture;
 	}
 
-	protected void startCluster() {
+	protected void startCluster() throws ClusterEntrypointException {
 		LOG.info("Starting {}.", getClass().getSimpleName());
 
 		try {
@@ -194,17 +162,24 @@ protected void startCluster() {
 			});
 		} catch (Throwable t) {
 			final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
-			LOG.error("Cluster initialization failed.", strippedThrowable);
 
-			shutDownAndTerminate(
-				STARTUP_FAILURE_RETURN_CODE,
-				ApplicationStatus.FAILED,
-				strippedThrowable.getMessage(),
-				false);
+			try {
+				// clean up any partial state
+				shutDownAsync(
+					ApplicationStatus.FAILED,
+					ExceptionUtils.stringifyException(strippedThrowable),
+					false).get(INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+			} catch (InterruptedException | ExecutionException | TimeoutException e) {
+				strippedThrowable.addSuppressed(e);
+			}
+
+			throw new ClusterEntrypointException(
+				String.format("Failed to initialize the cluster entrypoint %s.", getClass().getSimpleName()),
+				strippedThrowable);
 		}
 	}
 
-	protected void configureFileSystems(Configuration configuration) throws Exception {
+	private void configureFileSystems(Configuration configuration) throws Exception {
 		LOG.info("Install default filesystem.");
 
 		try {
@@ -223,7 +198,7 @@ protected SecurityContext installSecurityContext(Configuration configuration) th
 		return SecurityUtils.getInstalledContext();
 	}
 
-	protected void runCluster(Configuration configuration) throws Exception {
+	private void runCluster(Configuration configuration) throws Exception {
 		synchronized (lock) {
 			initializeServices(configuration);
 
@@ -231,27 +206,33 @@ protected void runCluster(Configuration configuration) throws Exception {
 			configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
 			configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
 
-			startClusterComponents(
+			final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);
+
+			clusterComponent = dispatcherResourceManagerComponentFactory.create(
 				configuration,
 				commonRpcService,
 				haServices,
 				blobServer,
 				heartbeatServices,
-				metricRegistry);
+				metricRegistry,
+				archivedExecutionGraphStore,
+				this);
 
-			dispatcher.getTerminationFuture().whenComplete(
-				(Void value, Throwable throwable) -> {
+			clusterComponent.getShutDownFuture().whenComplete(
+				(ApplicationStatus applicationStatus, Throwable throwable) -> {
 					if (throwable != null) {
-						LOG.info("Could not properly terminate the Dispatcher.", throwable);
+						shutDownAsync(
+							ApplicationStatus.UNKNOWN,
+							ExceptionUtils.stringifyException(throwable),
+							false);
+					} else {
+						// This is the general shutdown path. If a separate more specific shutdown was
+						// already triggered, this will do nothing
+						shutDownAsync(
+							applicationStatus,
+							null,
+							true);
 					}
-
-					// This is the general shutdown path. If a separate more specific shutdown was
-					// already triggered, this will do nothing
-					shutDownAndTerminate(
-						SUCCESS_RETURN_CODE,
-						ApplicationStatus.SUCCEEDED,
-						throwable != null ? throwable.getMessage() : null,
-						true);
 				});
 		}
 	}
@@ -283,99 +264,11 @@ protected void initializeServices(Configuration configuration) throws Exception
 
 			archivedExecutionGraphStore = createSerializableExecutionGraphStore(configuration, commonRpcService.getScheduledExecutor());
 
-			clusterInformation = new ClusterInformation(
-				commonRpcService.getAddress(),
-				blobServer.getPort());
-
 			transientBlobCache = new TransientBlobCache(
 				configuration,
 				new InetSocketAddress(
-					clusterInformation.getBlobServerHostname(),
-					clusterInformation.getBlobServerPort()));
-		}
-	}
-
-	protected void startClusterComponents(
-			Configuration configuration,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			BlobServer blobServer,
-			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry) throws Exception {
-		synchronized (lock) {
-			dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
-
-			resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();
-
-			LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
-				rpcService,
-				DispatcherGateway.class,
-				DispatcherId::fromUuid,
-				10,
-				Time.milliseconds(50L));
-
-			LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
-				rpcService,
-				ResourceManagerGateway.class,
-				ResourceManagerId::fromUuid,
-				10,
-				Time.milliseconds(50L));
-
-			// TODO: Remove once we have ported the MetricFetcher to the RpcEndpoint
-			final ActorSystem actorSystem = ((AkkaRpcService) rpcService).getActorSystem();
-			final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
-
-			webMonitorEndpoint = createRestEndpoint(
-				configuration,
-				dispatcherGatewayRetriever,
-				resourceManagerGatewayRetriever,
-				transientBlobCache,
-				rpcService.getExecutor(),
-				new AkkaQueryServiceRetriever(actorSystem, timeout),
-				highAvailabilityServices.getWebMonitorLeaderElectionService());
-
-			LOG.debug("Starting Dispatcher REST endpoint.");
-			webMonitorEndpoint.start();
-
-			resourceManager = createResourceManager(
-				configuration,
-				ResourceID.generate(),
-				rpcService,
-				highAvailabilityServices,
-				heartbeatServices,
-				metricRegistry,
-				this,
-				clusterInformation,
-				webMonitorEndpoint.getRestBaseUrl());
-
-			jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
-				metricRegistry,
-				rpcService.getAddress(),
-				ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
-
-			final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);
-
-			dispatcher = createDispatcher(
-				configuration,
-				rpcService,
-				highAvailabilityServices,
-				resourceManager.getSelfGateway(ResourceManagerGateway.class),
-				blobServer,
-				heartbeatServices,
-				jobManagerMetricGroup,
-				metricRegistry.getMetricQueryServicePath(),
-				archivedExecutionGraphStore,
-				this,
-				webMonitorEndpoint.getRestBaseUrl(),
-				historyServerArchivist);
-
-			LOG.debug("Starting ResourceManager.");
-			resourceManager.start();
-			resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
-
-			LOG.debug("Starting Dispatcher.");
-			dispatcher.start();
-			dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
+					commonRpcService.getAddress(),
+					blobServer.getPort()));
 		}
 	}
 
@@ -477,63 +370,6 @@ protected MetricRegistryImpl createMetricRegistry(Configuration configuration) {
 		}
 	}
 
-	protected CompletableFuture<Void> stopClusterComponents() {
-		synchronized (lock) {
-
-			Exception exception = null;
-
-			final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(4);
-
-			if (dispatcherLeaderRetrievalService != null) {
-				try {
-					dispatcherLeaderRetrievalService.stop();
-				} catch (Exception e) {
-					exception = ExceptionUtils.firstOrSuppressed(e, exception);
-				}
-			}
-
-			if (resourceManagerRetrievalService != null) {
-				try {
-					resourceManagerRetrievalService.stop();
-				} catch (Exception e) {
-					exception = ExceptionUtils.firstOrSuppressed(e, exception);
-				}
-			}
-
-			if (webMonitorEndpoint != null) {
-				terminationFutures.add(webMonitorEndpoint.closeAsync());
-			}
-
-			if (dispatcher != null) {
-				dispatcher.shutDown();
-				terminationFutures.add(dispatcher.getTerminationFuture());
-			}
-
-			if (resourceManager != null) {
-				resourceManager.shutDown();
-				terminationFutures.add(resourceManager.getTerminationFuture());
-			}
-
-			if (exception != null) {
-				terminationFutures.add(FutureUtils.completedExceptionally(exception));
-			}
-
-			final CompletableFuture<Void> componentTerminationFuture = FutureUtils.completeAll(terminationFutures);
-
-			if (jobManagerMetricGroup != null) {
-				return FutureUtils.runAfterwards(
-					componentTerminationFuture,
-					() -> {
-						synchronized (lock) {
-							jobManagerMetricGroup.close();
-						}
-					});
-			} else {
-				return componentTerminationFuture;
-			}
-		}
-	}
-
 	@Override
 	public void onFatalError(Throwable exception) {
 		LOG.error("Fatal error occurred in the cluster entrypoint.", exception);
@@ -556,21 +392,20 @@ private Configuration generateClusterConfiguration(Configuration configuration)
 		return resultConfiguration;
 	}
 
-	private CompletableFuture<Void> shutDownAsync(
-			boolean cleanupHaData,
+	private CompletableFuture<ApplicationStatus> shutDownAsync(
 			ApplicationStatus applicationStatus,
-			@Nullable String diagnostics) {
+			@Nullable String diagnostics,
+			boolean cleanupHaData) {
 		if (isShutDown.compareAndSet(false, true)) {
-			LOG.info("Stopping {}.", getClass().getSimpleName());
-
-			final CompletableFuture<Void> shutDownApplicationFuture = deregisterApplication(applicationStatus, diagnostics);
+			LOG.info("Shutting {} down with application status {}. Diagnostics {}.",
+				getClass().getSimpleName(),
+				applicationStatus,
+				diagnostics);
 
-			final CompletableFuture<Void> componentShutdownFuture = FutureUtils.composeAfterwards(
-				shutDownApplicationFuture,
-				this::stopClusterComponents);
+			final CompletableFuture<Void> shutDownApplicationFuture = closeClusterComponent(applicationStatus, diagnostics);
 
 			final CompletableFuture<Void> serviceShutdownFuture = FutureUtils.composeAfterwards(
-				componentShutdownFuture,
+				shutDownApplicationFuture,
 				() -> stopClusterServices(cleanupHaData));
 
 			final CompletableFuture<Void> cleanupDirectoriesFuture = FutureUtils.runAfterwards(
@@ -582,7 +417,7 @@ private Configuration generateClusterConfiguration(Configuration configuration)
 					if (serviceThrowable != null) {
 						terminationFuture.completeExceptionally(serviceThrowable);
 					} else {
-						terminationFuture.complete(null);
+						terminationFuture.complete(applicationStatus);
 					}
 				});
 		}
@@ -590,36 +425,6 @@ private Configuration generateClusterConfiguration(Configuration configuration)
 		return terminationFuture;
 	}
 
-	protected void shutDownAndTerminate(
-		int returnCode,
-		ApplicationStatus applicationStatus,
-		@Nullable String diagnostics,
-		boolean cleanupHaData) {
-
-		if (isTerminating.compareAndSet(false, true)) {
-			LOG.info("Shut down and terminate {} with return code {} and application status {}.",
-				getClass().getSimpleName(),
-				returnCode,
-				applicationStatus);
-
-			shutDownAsync(
-				cleanupHaData,
-				applicationStatus,
-				diagnostics).whenComplete(
-				(Void ignored, Throwable t) -> {
-					if (t != null) {
-						LOG.info("Could not properly shut down cluster entrypoint.", t);
-					}
-
-					System.exit(returnCode);
-				});
-		} else {
-			LOG.debug("Concurrent termination call detected. Ignoring termination call with return code {} and application status {}.",
-				returnCode,
-				applicationStatus);
-		}
-	}
-
 	/**
 	 * Deregister the Flink application from the resource management system by signalling
 	 * the {@link ResourceManager}.
@@ -628,11 +433,12 @@ protected void shutDownAndTerminate(
 	 * @param diagnostics additional information about the shut down, can be {@code null}
 	 * @return Future which is completed once the shut down
 	 */
-	private CompletableFuture<Void> deregisterApplication(ApplicationStatus applicationStatus, @Nullable String diagnostics) {
+	private CompletableFuture<Void> closeClusterComponent(ApplicationStatus applicationStatus, @Nullable String diagnostics) {
 		synchronized (lock) {
-			if (resourceManager != null) {
-				final ResourceManagerGateway selfGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
-				return selfGateway.deregisterApplication(applicationStatus, diagnostics).thenApply(ack -> null);
+			if (clusterComponent != null) {
+				final CompletableFuture<Void> deregisterApplicationFuture = clusterComponent.deregisterApplication(applicationStatus, diagnostics);
+
+				return FutureUtils.runAfterwards(deregisterApplicationFuture, clusterComponent::closeAsync);
 			} else {
 				return CompletableFuture.completedFuture(null);
 			}
@@ -656,39 +462,7 @@ private void cleanupDirectories() throws IOException {
 	// Abstract methods
 	// --------------------------------------------------
 
-	protected abstract Dispatcher createDispatcher(
-		Configuration configuration,
-		RpcService rpcService,
-		HighAvailabilityServices highAvailabilityServices,
-		ResourceManagerGateway resourceManagerGateway,
-		BlobServer blobServer,
-		HeartbeatServices heartbeatServices,
-		JobManagerMetricGroup jobManagerMetricGroup,
-		@Nullable String metricQueryServicePath,
-		ArchivedExecutionGraphStore archivedExecutionGraphStore,
-		FatalErrorHandler fatalErrorHandler,
-		@Nullable String restAddress,
-		HistoryServerArchivist historyServerArchivist) throws Exception;
-
-	protected abstract ResourceManager<?> createResourceManager(
-		Configuration configuration,
-		ResourceID resourceId,
-		RpcService rpcService,
-		HighAvailabilityServices highAvailabilityServices,
-		HeartbeatServices heartbeatServices,
-		MetricRegistry metricRegistry,
-		FatalErrorHandler fatalErrorHandler,
-		ClusterInformation clusterInformation,
-		@Nullable String webInterfaceUrl) throws Exception;
-
-	protected abstract WebMonitorEndpoint<?> createRestEndpoint(
-		Configuration configuration,
-		LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
-		LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
-		TransientBlobService transientBlobService,
-		Executor executor,
-		MetricQueryServiceRetriever metricQueryServiceRetriever,
-		LeaderElectionService leaderElectionService) throws Exception;
+	protected abstract DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration);
 
 	protected abstract ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
 		Configuration configuration,
@@ -719,6 +493,34 @@ protected static Configuration loadConfiguration(EntrypointClusterConfiguration
 		return configuration;
 	}
 
+	// --------------------------------------------------
+	// Helper methods
+	// --------------------------------------------------
+
+	public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
+
+		final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
+		try {
+			clusterEntrypoint.startCluster();
+		} catch (ClusterEntrypointException e) {
+			LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName), e);
+			System.exit(STARTUP_FAILURE_RETURN_CODE);
+		}
+
+		clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, throwable) -> {
+			final int returnCode;
+
+			if (throwable != null) {
+				returnCode = RUNTIME_FAILURE_RETURN_CODE;
+			} else {
+				returnCode = applicationStatus.processExitCode();
+			}
+
+			LOG.info("Terminating cluster entrypoint process {} with exit code {}.", clusterEntrypointName, returnCode, throwable);
+			System.exit(returnCode);
+		});
+	}
+
 	/**
 	 * Execution mode of the {@link MiniDispatcher}.
 	 */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointException.java
new file mode 100644
index 00000000000..21d37ee01ee
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exceptions thrown by the {@link ClusterEntrypoint}.
+ */
+public class ClusterEntrypointException extends FlinkException {
+	private static final long serialVersionUID = -3855286807063809945L;
+
+	public ClusterEntrypointException(String message) {
+		super(message);
+	}
+
+	public ClusterEntrypointException(Throwable cause) {
+		super(cause);
+	}
+
+	public ClusterEntrypointException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 80a9da278c1..0426df9aabd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -19,35 +19,9 @@
 package org.apache.flink.runtime.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.TransientBlobService;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.Dispatcher;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
 import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.MiniDispatcher;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
-import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.util.FlinkException;
-
-import javax.annotation.Nullable;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
 
 /**
  * Base class for per-job cluster entry points.
@@ -58,82 +32,10 @@ public JobClusterEntrypoint(Configuration configuration) {
 		super(configuration);
 	}
 
-	@Override
-	protected MiniDispatcherRestEndpoint createRestEndpoint(
-			Configuration configuration,
-			LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
-			LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
-			TransientBlobService transientBlobService,
-			Executor executor,
-			MetricQueryServiceRetriever metricQueryServiceRetriever,
-			LeaderElectionService leaderElectionService) throws Exception {
-		final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(configuration);
-
-		return new MiniDispatcherRestEndpoint(
-			RestServerEndpointConfiguration.fromConfiguration(configuration),
-			dispatcherGatewayRetriever,
-			configuration,
-			restHandlerConfiguration,
-			resourceManagerGatewayRetriever,
-			transientBlobService,
-			executor,
-			metricQueryServiceRetriever,
-			leaderElectionService,
-			this);
-	}
-
 	@Override
 	protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
 			Configuration configuration,
 			ScheduledExecutor scheduledExecutor) {
 		return new MemoryArchivedExecutionGraphStore();
 	}
-
-	@Override
-	protected Dispatcher createDispatcher(
-			Configuration configuration,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			ResourceManagerGateway resourceManagerGateway,
-			BlobServer blobServer,
-			HeartbeatServices heartbeatServices,
-			JobManagerMetricGroup jobManagerMetricGroup,
-			@Nullable String metricQueryServicePath,
-			ArchivedExecutionGraphStore archivedExecutionGraphStore,
-			FatalErrorHandler fatalErrorHandler,
-			@Nullable String restAddress,
-			HistoryServerArchivist historyServerArchivist) throws Exception {
-
-		final JobGraph jobGraph = retrieveJobGraph(configuration);
-
-		final String executionModeValue = configuration.getString(EXECUTION_MODE);
-
-		final ExecutionMode executionMode = ExecutionMode.valueOf(executionModeValue);
-
-		final MiniDispatcher dispatcher = new MiniDispatcher(
-			rpcService,
-			Dispatcher.DISPATCHER_NAME,
-			configuration,
-			highAvailabilityServices,
-			resourceManagerGateway,
-			blobServer,
-			heartbeatServices,
-			jobManagerMetricGroup,
-			metricQueryServicePath,
-			archivedExecutionGraphStore,
-			Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
-			fatalErrorHandler,
-			restAddress,
-			historyServerArchivist,
-			jobGraph,
-			executionMode);
-
-		registerShutdownActions(dispatcher.getJobTerminationFuture());
-
-		return dispatcher;
-	}
-
-	protected abstract JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException;
-
-	protected abstract void registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 40eb8b76104..1fb693cf83b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -22,35 +22,14 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.TransientBlobService;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.Dispatcher;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
 import org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
-import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
-import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 
 import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
 
-import javax.annotation.Nullable;
-
 import java.io.File;
 import java.io.IOException;
-import java.util.concurrent.Executor;
 
 /**
  * Base class for session cluster entry points.
@@ -77,62 +56,4 @@ protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
 			scheduledExecutor,
 			Ticker.systemTicker());
 	}
-
-	@Override
-	protected DispatcherRestEndpoint createRestEndpoint(
-			Configuration configuration,
-			LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
-			LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
-			TransientBlobService transientBlobService,
-			Executor executor,
-			MetricQueryServiceRetriever metricQueryServiceRetriever,
-			LeaderElectionService leaderElectionService) throws Exception {
-
-		final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(configuration);
-
-		return new DispatcherRestEndpoint(
-			RestServerEndpointConfiguration.fromConfiguration(configuration),
-			dispatcherGatewayRetriever,
-			configuration,
-			restHandlerConfiguration,
-			resourceManagerGatewayRetriever,
-			transientBlobService,
-			executor,
-			metricQueryServiceRetriever,
-			leaderElectionService,
-			this);
-	}
-
-	@Override
-	protected Dispatcher createDispatcher(
-			Configuration configuration,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			ResourceManagerGateway resourceManagerGateway,
-			BlobServer blobServer,
-			HeartbeatServices heartbeatServices,
-			JobManagerMetricGroup jobManagerMetricGroup,
-			@Nullable String metricQueryServicePath,
-			ArchivedExecutionGraphStore archivedExecutionGraphStore,
-			FatalErrorHandler fatalErrorHandler,
-			@Nullable String restAddress,
-			HistoryServerArchivist historyServerArchivist) throws Exception {
-
-		// create the default dispatcher
-		return new StandaloneDispatcher(
-			rpcService,
-			Dispatcher.DISPATCHER_NAME,
-			configuration,
-			highAvailabilityServices,
-			resourceManagerGateway,
-			blobServer,
-			heartbeatServices,
-			jobManagerMetricGroup,
-			metricQueryServicePath,
-			archivedExecutionGraphStore,
-			Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
-			fatalErrorHandler,
-			restAddress,
-			historyServerArchivist);
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
index edff87b783d..127fc8b9831 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
@@ -19,24 +19,14 @@
 package org.apache.flink.runtime.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 
-import javax.annotation.Nullable;
-
 /**
  * Entry point for the standalone session cluster.
  */
@@ -47,33 +37,8 @@ public StandaloneSessionClusterEntrypoint(Configuration configuration) {
 	}
 
 	@Override
-	protected ResourceManager<?> createResourceManager(
-			Configuration configuration,
-			ResourceID resourceId,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry,
-			FatalErrorHandler fatalErrorHandler,
-			ClusterInformation clusterInformation,
-			@Nullable String webInterfaceUrl) throws Exception {
-		final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-		final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
-			resourceManagerRuntimeServicesConfiguration,
-			highAvailabilityServices,
-			rpcService.getScheduledExecutor());
-
-		return new StandaloneResourceManager(
-			rpcService,
-			FlinkResourceManager.RESOURCE_MANAGER_NAME,
-			resourceId,
-			highAvailabilityServices,
-			heartbeatServices,
-			resourceManagerRuntimeServices.getSlotManager(),
-			metricRegistry,
-			resourceManagerRuntimeServices.getJobLeaderIdService(),
-			clusterInformation,
-			fatalErrorHandler);
+	protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
+		return new SessionDispatcherResourceManagerComponentFactory(StandaloneResourceManagerFactory.INSTANCE);
 	}
 
 	public static void main(String[] args) {
@@ -97,6 +62,6 @@ public static void main(String[] args) {
 
 		StandaloneSessionClusterEntrypoint entrypoint = new StandaloneSessionClusterEntrypoint(configuration);
 
-		entrypoint.startCluster();
+		ClusterEntrypoint.runClusterEntrypoint(entrypoint);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
new file mode 100644
index 00000000000..0a374117841
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherFactory;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.MetricUtils;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.rest.RestEndpointFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+import akka.actor.ActorSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstract class which implements the creation of the {@link DispatcherResourceManagerComponent} components.
+ *
+ * @param <T> type of the {@link Dispatcher}
+ * @param <U> type of the {@link RestfulGateway} given to the {@link WebMonitorEndpoint}
+ */
+public abstract class AbstractDispatcherResourceManagerComponentFactory<T extends Dispatcher, U extends RestfulGateway> implements DispatcherResourceManagerComponentFactory<T> {
+
+	private final Logger log = LoggerFactory.getLogger(getClass());
+
+	@Nonnull
+	private final DispatcherFactory<T> dispatcherFactory;
+
+	@Nonnull
+	private final ResourceManagerFactory<?> resourceManagerFactory;
+
+	@Nonnull
+	private final RestEndpointFactory<U> restEndpointFactory;
+
+	public AbstractDispatcherResourceManagerComponentFactory(
+			@Nonnull DispatcherFactory<T> dispatcherFactory,
+			@Nonnull ResourceManagerFactory<?> resourceManagerFactory,
+			@Nonnull RestEndpointFactory<U> restEndpointFactory) {
+		this.dispatcherFactory = dispatcherFactory;
+		this.resourceManagerFactory = resourceManagerFactory;
+		this.restEndpointFactory = restEndpointFactory;
+	}
+
+	@Override
+	public DispatcherResourceManagerComponent<T> create(
+			Configuration configuration,
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			BlobServer blobServer,
+			HeartbeatServices heartbeatServices,
+			MetricRegistry metricRegistry,
+			ArchivedExecutionGraphStore archivedExecutionGraphStore,
+			FatalErrorHandler fatalErrorHandler) throws Exception {
+
+		LeaderRetrievalService dispatcherLeaderRetrievalService = null;
+		LeaderRetrievalService resourceManagerRetrievalService = null;
+		WebMonitorEndpoint<U> webMonitorEndpoint = null;
+		ResourceManager<?> resourceManager = null;
+		JobManagerMetricGroup jobManagerMetricGroup = null;
+		T dispatcher = null;
+
+		try {
+			dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
+
+			resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();
+
+			final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
+				rpcService,
+				DispatcherGateway.class,
+				DispatcherId::fromUuid,
+				10,
+				Time.milliseconds(50L));
+
+			final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
+				rpcService,
+				ResourceManagerGateway.class,
+				ResourceManagerId::fromUuid,
+				10,
+				Time.milliseconds(50L));
+
+			// TODO: Remove once we have ported the MetricFetcher to the RpcEndpoint
+			final ActorSystem actorSystem = ((AkkaRpcService) rpcService).getActorSystem();
+			final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
+
+			webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
+				configuration,
+				dispatcherGatewayRetriever,
+				resourceManagerGatewayRetriever,
+				blobServer,
+				rpcService.getExecutor(),
+				new AkkaQueryServiceRetriever(actorSystem, timeout),
+				highAvailabilityServices.getWebMonitorLeaderElectionService(),
+				fatalErrorHandler);
+
+			log.debug("Starting Dispatcher REST endpoint.");
+			webMonitorEndpoint.start();
+
+			resourceManager = resourceManagerFactory.createResourceManager(
+				configuration,
+				ResourceID.generate(),
+				rpcService,
+				highAvailabilityServices,
+				heartbeatServices,
+				metricRegistry,
+				fatalErrorHandler,
+				new ClusterInformation(rpcService.getAddress(), blobServer.getPort()),
+				webMonitorEndpoint.getRestBaseUrl());
+
+			jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
+				metricRegistry,
+				rpcService.getAddress(),
+				ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
+
+			final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);
+
+			dispatcher = dispatcherFactory.createDispatcher(
+				configuration,
+				rpcService,
+				highAvailabilityServices,
+				resourceManager.getSelfGateway(ResourceManagerGateway.class),
+				blobServer,
+				heartbeatServices,
+				jobManagerMetricGroup,
+				metricRegistry.getMetricQueryServicePath(),
+				archivedExecutionGraphStore,
+				fatalErrorHandler,
+				webMonitorEndpoint.getRestBaseUrl(),
+				historyServerArchivist);
+
+			log.debug("Starting ResourceManager.");
+			resourceManager.start();
+			resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
+
+			log.debug("Starting Dispatcher.");
+			dispatcher.start();
+			dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
+
+			return createDispatcherResourceManagerComponent(
+				dispatcher,
+				resourceManager,
+				dispatcherLeaderRetrievalService,
+				resourceManagerRetrievalService,
+				webMonitorEndpoint,
+				jobManagerMetricGroup);
+
+		} catch (Exception exception) {
+			// clean up all started components
+			if (dispatcherLeaderRetrievalService != null) {
+				try {
+					dispatcherLeaderRetrievalService.stop();
+				} catch (Exception e) {
+					exception = ExceptionUtils.firstOrSuppressed(e, exception);
+				}
+			}
+
+			if (resourceManagerRetrievalService != null) {
+				try {
+					resourceManagerRetrievalService.stop();
+				} catch (Exception e) {
+					exception = ExceptionUtils.firstOrSuppressed(e, exception);
+				}
+			}
+
+			final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3);
+
+			if (webMonitorEndpoint != null) {
+				terminationFutures.add(webMonitorEndpoint.closeAsync());
+			}
+
+			if (resourceManager != null) {
+				resourceManager.shutDown();
+				terminationFutures.add(resourceManager.getTerminationFuture());
+			}
+
+			if (dispatcher != null) {
+				dispatcher.shutDown();
+				terminationFutures.add(dispatcher.getTerminationFuture());
+			}
+
+			final FutureUtils.ConjunctFuture<Void> terminationFuture = FutureUtils.completeAll(terminationFutures);
+
+			try {
+				terminationFuture.get();
+			} catch (Exception e) {
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
+			}
+
+			if (jobManagerMetricGroup != null) {
+				jobManagerMetricGroup.close();
+			}
+
+			throw new FlinkException("Could not create the DispatcherResourceManagerComponent.", exception);
+		}
+	}
+
+	protected abstract DispatcherResourceManagerComponent<T> createDispatcherResourceManagerComponent(
+		T dispatcher,
+		ResourceManager<?> resourceManager,
+		LeaderRetrievalService dispatcherLeaderRetrievalService,
+		LeaderRetrievalService resourceManagerRetrievalService,
+		WebMonitorEndpoint<?> webMonitorEndpoint,
+		JobManagerMetricGroup jobManagerMetricGroup);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
new file mode 100644
index 00000000000..94925b2aba0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Component which starts a {@link Dispatcher}, {@link ResourceManager} and {@link WebMonitorEndpoint}
+ * in the same process.
+ */
+public class DispatcherResourceManagerComponent<T extends Dispatcher> implements AutoCloseableAsync {
+
+	@Nonnull
+	private final T dispatcher;
+
+	@Nonnull
+	private final ResourceManager<?> resourceManager;
+
+	@Nonnull
+	private final LeaderRetrievalService dispatcherLeaderRetrievalService;
+
+	@Nonnull
+	private final LeaderRetrievalService resourceManagerRetrievalService;
+
+	@Nonnull
+	private final WebMonitorEndpoint<?> webMonitorEndpoint;
+
+	@Nonnull
+	private final JobManagerMetricGroup jobManagerMetricGroup;
+
+	private final CompletableFuture<Void> terminationFuture;
+
+	private final CompletableFuture<ApplicationStatus> shutDownFuture;
+
+	private final AtomicBoolean isRunning = new AtomicBoolean(true);
+
+	DispatcherResourceManagerComponent(
+			@Nonnull T dispatcher,
+			@Nonnull ResourceManager<?> resourceManager,
+			@Nonnull LeaderRetrievalService dispatcherLeaderRetrievalService,
+			@Nonnull LeaderRetrievalService resourceManagerRetrievalService,
+			@Nonnull WebMonitorEndpoint<?> webMonitorEndpoint,
+			@Nonnull JobManagerMetricGroup jobManagerMetricGroup) {
+		this.resourceManager = resourceManager;
+		this.dispatcher = dispatcher;
+		this.dispatcherLeaderRetrievalService = dispatcherLeaderRetrievalService;
+		this.resourceManagerRetrievalService = resourceManagerRetrievalService;
+		this.webMonitorEndpoint = webMonitorEndpoint;
+		this.jobManagerMetricGroup = jobManagerMetricGroup;
+		this.terminationFuture = new CompletableFuture<>();
+		this.shutDownFuture = new CompletableFuture<>();
+
+		registerShutDownFuture();
+	}
+
+	private void registerShutDownFuture() {
+		terminationFuture.whenComplete(
+			(aVoid, throwable) -> {
+				if (throwable != null) {
+					shutDownFuture.completeExceptionally(throwable);
+				} else {
+					shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
+				}
+			});
+
+		dispatcher
+			.getTerminationFuture()
+			.whenComplete(
+				(aVoid, throwable) -> {
+					if (throwable != null) {
+						shutDownFuture.completeExceptionally(throwable);
+					} else {
+						shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
+					}
+				});
+	}
+
+	public CompletableFuture<Void> getTerminationFuture() {
+		return terminationFuture;
+	}
+
+	public final CompletableFuture<ApplicationStatus> getShutDownFuture() {
+		return shutDownFuture;
+	}
+
+	@Override
+	public CompletableFuture<Void> closeAsync() {
+		if (isRunning.compareAndSet(true, false)) {
+			Exception exception = null;
+
+			final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(4);
+
+			try {
+				dispatcherLeaderRetrievalService.stop();
+			} catch (Exception e) {
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
+			}
+
+			try {
+				resourceManagerRetrievalService.stop();
+			} catch (Exception e) {
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
+			}
+
+			terminationFutures.add(webMonitorEndpoint.closeAsync());
+
+			dispatcher.shutDown();
+			terminationFutures.add(dispatcher.getTerminationFuture());
+
+			resourceManager.shutDown();
+			terminationFutures.add(resourceManager.getTerminationFuture());
+
+			if (exception != null) {
+				terminationFutures.add(FutureUtils.completedExceptionally(exception));
+			}
+
+			final CompletableFuture<Void> componentTerminationFuture = FutureUtils.completeAll(terminationFutures);
+
+			final CompletableFuture<Void> metricGroupTerminationFuture = FutureUtils.runAfterwards(
+				componentTerminationFuture,
+				jobManagerMetricGroup::close);
+
+			metricGroupTerminationFuture.whenComplete((aVoid, throwable) -> {
+				if (throwable != null) {
+					terminationFuture.completeExceptionally(throwable);
+				} else {
+					terminationFuture.complete(aVoid);
+				}
+			});
+		}
+
+		return terminationFuture;
+	}
+
+	/**
+	 * Deregister the Flink application from the resource management system by signalling
+	 * the {@link ResourceManager}.
+	 *
+	 * @param applicationStatus to terminate the application with
+	 * @param diagnostics additional information about the shut down, can be {@code null}
+	 * @return Future which is completed once the shut down
+	 */
+	public CompletableFuture<Void> deregisterApplication(ApplicationStatus applicationStatus, @Nullable String diagnostics) {
+		final ResourceManagerGateway selfGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
+		return selfGateway.deregisterApplication(applicationStatus, diagnostics).thenApply(ack -> null);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
new file mode 100644
index 00000000000..df22a59d955
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+/**
+ * Factory for the {@link DispatcherResourceManagerComponent}.
+ */
+public interface DispatcherResourceManagerComponentFactory<T extends Dispatcher> {
+
+	DispatcherResourceManagerComponent<T> create(
+		Configuration configuration,
+		RpcService rpcService,
+		HighAvailabilityServices highAvailabilityServices,
+		BlobServer blobServer,
+		HeartbeatServices heartbeatServices,
+		MetricRegistry metricRegistry,
+		ArchivedExecutionGraphStore archivedExecutionGraphStore,
+		FatalErrorHandler fatalErrorHandler) throws Exception;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/FileJobGraphRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/FileJobGraphRetriever.java
new file mode 100644
index 00000000000..1848408e9fa
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/FileJobGraphRetriever.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * {@link JobGraphRetriever} implementation which retrieves the {@link JobGraph} from
+ * a file on disk.
+ */
+public class FileJobGraphRetriever implements JobGraphRetriever {
+
+	public static final ConfigOption<String> JOB_GRAPH_FILE_PATH = ConfigOptions
+		.key("internal.jobgraph-path")
+		.defaultValue("job.graph");
+
+	@Nonnull
+	private final String jobGraphFile;
+
+	public FileJobGraphRetriever(@Nonnull String jobGraphFile) {
+		this.jobGraphFile = jobGraphFile;
+	}
+
+	@Override
+	public JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException {
+		File fp = new File(jobGraphFile);
+
+		try (FileInputStream input = new FileInputStream(fp);
+			ObjectInputStream obInput = new ObjectInputStream(input)) {
+
+			return (JobGraph) obInput.readObject();
+		} catch (FileNotFoundException e) {
+			throw new FlinkException("Could not find the JobGraph file.", e);
+		} catch (ClassNotFoundException | IOException e) {
+			throw new FlinkException("Could not load the JobGraph from file.", e);
+		}
+	}
+
+	public static FileJobGraphRetriever createFrom(Configuration configuration) {
+		return new FileJobGraphRetriever(configuration.getString(JOB_GRAPH_FILE_PATH));
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponent.java
new file mode 100644
index 00000000000..c1df47fe98e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponent.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.dispatcher.MiniDispatcher;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@link DispatcherResourceManagerComponent} for a job cluster. The dispatcher component starts
+ * a {@link MiniDispatcher}.
+ */
+class JobDispatcherResourceManagerComponent extends DispatcherResourceManagerComponent<MiniDispatcher> {
+
+	JobDispatcherResourceManagerComponent(
+			MiniDispatcher dispatcher,
+			ResourceManager<?> resourceManager,
+			LeaderRetrievalService dispatcherLeaderRetrievalService,
+			LeaderRetrievalService resourceManagerRetrievalService,
+			WebMonitorEndpoint<?> webMonitorEndpoint,
+			JobManagerMetricGroup jobManagerMetricGroup) {
+		super(dispatcher, resourceManager, dispatcherLeaderRetrievalService, resourceManagerRetrievalService, webMonitorEndpoint, jobManagerMetricGroup);
+
+		final CompletableFuture<ApplicationStatus> shutDownFuture = getShutDownFuture();
+
+		dispatcher.getJobTerminationFuture().whenComplete((applicationStatus, throwable) -> {
+			if (throwable != null) {
+				shutDownFuture.completeExceptionally(throwable);
+			} else {
+				shutDownFuture.complete(applicationStatus);
+			}
+		});
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponentFactory.java
new file mode 100644
index 00000000000..c7ce14c1db3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponentFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.runtime.dispatcher.JobDispatcherFactory;
+import org.apache.flink.runtime.dispatcher.MiniDispatcher;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.rest.JobRestEndpointFactory;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+
+import javax.annotation.Nonnull;
+
+/**
+ * {@link DispatcherResourceManagerComponentFactory} for a {@link JobDispatcherResourceManagerComponent}.
+ */
+public class JobDispatcherResourceManagerComponentFactory extends AbstractDispatcherResourceManagerComponentFactory<MiniDispatcher, RestfulGateway> {
+
+	public JobDispatcherResourceManagerComponentFactory(@Nonnull ResourceManagerFactory<?> resourceManagerFactory, @Nonnull JobGraphRetriever jobGraphRetriever) {
+		super(new JobDispatcherFactory(jobGraphRetriever), resourceManagerFactory, JobRestEndpointFactory.INSTANCE);
+	}
+
+	@Override
+	protected DispatcherResourceManagerComponent<MiniDispatcher> createDispatcherResourceManagerComponent(
+			MiniDispatcher dispatcher,
+			ResourceManager<?> resourceManager,
+			LeaderRetrievalService dispatcherLeaderRetrievalService,
+			LeaderRetrievalService resourceManagerRetrievalService,
+			WebMonitorEndpoint<?> webMonitorEndpoint,
+			JobManagerMetricGroup jobManagerMetricGroup) {
+		return new JobDispatcherResourceManagerComponent(
+			dispatcher,
+			resourceManager,
+			dispatcherLeaderRetrievalService,
+			resourceManagerRetrievalService,
+			webMonitorEndpoint,
+			jobManagerMetricGroup);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobGraphRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobGraphRetriever.java
new file mode 100644
index 00000000000..b1586ea1afe
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobGraphRetriever.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Interface which allows to retrieve the {@link JobGraph}.
+ */
+public interface JobGraphRetriever {
+
+	/**
+	 * Retrieve the {@link JobGraph}.
+	 *
+	 * @param configuration cluster configuration
+	 * @return the retrieved {@link JobGraph}.
+	 * @throws FlinkException if the {@link JobGraph} could not be retrieved
+	 */
+	JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponent.java
new file mode 100644
index 00000000000..8be7b74c86f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponent.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+
+/**
+ * {@link DispatcherResourceManagerComponent} used by session clusters.
+ */
+class SessionDispatcherResourceManagerComponent extends DispatcherResourceManagerComponent<Dispatcher> {
+	SessionDispatcherResourceManagerComponent(
+			Dispatcher dispatcher,
+			ResourceManager<?> resourceManager,
+			LeaderRetrievalService dispatcherLeaderRetrievalService,
+			LeaderRetrievalService resourceManagerRetrievalService,
+			WebMonitorEndpoint<?> webMonitorEndpoint,
+			JobManagerMetricGroup jobManagerMetricGroup) {
+		super(dispatcher, resourceManager, dispatcherLeaderRetrievalService, resourceManagerRetrievalService, webMonitorEndpoint, jobManagerMetricGroup);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponentFactory.java
new file mode 100644
index 00000000000..c44833df2ff
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponentFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.rest.SessionRestEndpointFactory;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+
+import javax.annotation.Nonnull;
+
+/**
+ * {@link DispatcherResourceManagerComponentFactory} for a {@link SessionDispatcherResourceManagerComponent}.
+ */
+public class SessionDispatcherResourceManagerComponentFactory extends AbstractDispatcherResourceManagerComponentFactory<Dispatcher, DispatcherGateway> {
+
+	public SessionDispatcherResourceManagerComponentFactory(@Nonnull ResourceManagerFactory<?> resourceManagerFactory) {
+		super(SessionDispatcherFactory.INSTANCE, resourceManagerFactory, SessionRestEndpointFactory.INSTANCE);
+	}
+
+	@Override
+	protected DispatcherResourceManagerComponent<Dispatcher> createDispatcherResourceManagerComponent(
+			Dispatcher dispatcher,
+			ResourceManager<?> resourceManager,
+			LeaderRetrievalService dispatcherLeaderRetrievalService,
+			LeaderRetrievalService resourceManagerRetrievalService,
+			WebMonitorEndpoint<?> webMonitorEndpoint,
+			JobManagerMetricGroup jobManagerMetricGroup) {
+		return new SessionDispatcherResourceManagerComponent(
+			dispatcher,
+			resourceManager,
+			dispatcherLeaderRetrievalService,
+			resourceManagerRetrievalService,
+			webMonitorEndpoint,
+			jobManagerMetricGroup);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
new file mode 100644
index 00000000000..91a7b267594
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link ResourceManager} factory.
+ *
+ * @param <T> type of the workers of the ResourceManager
+ */
+public interface ResourceManagerFactory<T extends ResourceIDRetrievable> {
+
+	ResourceManager<T> createResourceManager(
+		Configuration configuration,
+		ResourceID resourceId,
+		RpcService rpcService,
+		HighAvailabilityServices highAvailabilityServices,
+		HeartbeatServices heartbeatServices,
+		MetricRegistry metricRegistry,
+		FatalErrorHandler fatalErrorHandler,
+		ClusterInformation clusterInformation,
+		@Nullable String webInterfaceUrl) throws Exception;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
new file mode 100644
index 00000000000..c8e314f659c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link ResourceManagerFactory} which creates a {@link StandaloneResourceManager}.
+ */
+public enum StandaloneResourceManagerFactory implements ResourceManagerFactory<ResourceID> {
+	INSTANCE;
+
+	@Override
+	public ResourceManager<ResourceID> createResourceManager(
+			Configuration configuration,
+			ResourceID resourceId,
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			HeartbeatServices heartbeatServices,
+			MetricRegistry metricRegistry,
+			FatalErrorHandler fatalErrorHandler,
+			ClusterInformation clusterInformation,
+			@Nullable String webInterfaceUrl) throws Exception {
+		final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
+		final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
+			resourceManagerRuntimeServicesConfiguration,
+			highAvailabilityServices,
+			rpcService.getScheduledExecutor());
+
+		return new StandaloneResourceManager(
+			rpcService,
+			FlinkResourceManager.RESOURCE_MANAGER_NAME,
+			resourceId,
+			highAvailabilityServices,
+			heartbeatServices,
+			resourceManagerRuntimeServices.getSlotManager(),
+			metricRegistry,
+			resourceManagerRuntimeServices.getJobLeaderIdService(),
+			clusterInformation,
+			fatalErrorHandler);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
new file mode 100644
index 00000000000..da4b0633f40
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+
+import java.util.concurrent.Executor;
+
+/**
+ * {@link RestEndpointFactory} which creates a {@link MiniDispatcherRestEndpoint}.
+ */
+public enum JobRestEndpointFactory implements RestEndpointFactory<RestfulGateway> {
+	INSTANCE;
+
+	@Override
+	public WebMonitorEndpoint<RestfulGateway> createRestEndpoint(
+			Configuration configuration,
+			LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
+			LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
+			TransientBlobService transientBlobService,
+			Executor executor,
+			MetricQueryServiceRetriever metricQueryServiceRetriever,
+			LeaderElectionService leaderElectionService,
+			FatalErrorHandler fatalErrorHandler) throws Exception {
+		final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(configuration);
+
+		return new MiniDispatcherRestEndpoint(
+			RestServerEndpointConfiguration.fromConfiguration(configuration),
+			dispatcherGatewayRetriever,
+			configuration,
+			restHandlerConfiguration,
+			resourceManagerGatewayRetriever,
+			transientBlobService,
+			executor,
+			metricQueryServiceRetriever,
+			leaderElectionService,
+			fatalErrorHandler);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
new file mode 100644
index 00000000000..ffdc0cbc39e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+
+import java.util.concurrent.Executor;
+
+/**
+ * {@link WebMonitorEndpoint} factory.
+ *
+ * @param <T> type of the {@link RestfulGateway}
+ */
+public interface RestEndpointFactory<T extends RestfulGateway> {
+
+	WebMonitorEndpoint<T> createRestEndpoint(
+		Configuration configuration,
+		LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
+		LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
+		TransientBlobService transientBlobService,
+		Executor executor,
+		MetricQueryServiceRetriever metricQueryServiceRetriever,
+		LeaderElectionService leaderElectionService,
+		FatalErrorHandler fatalErrorHandler) throws Exception;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
new file mode 100644
index 00000000000..359efbfa18e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+
+import java.util.concurrent.Executor;
+
+/**
+ * {@link RestEndpointFactory} which creates a {@link DispatcherRestEndpoint}.
+ */
+public enum SessionRestEndpointFactory implements RestEndpointFactory<DispatcherGateway> {
+	INSTANCE;
+
+	@Override
+	public WebMonitorEndpoint<DispatcherGateway> createRestEndpoint(
+			Configuration configuration,
+			LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
+			LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
+			TransientBlobService transientBlobService,
+			Executor executor,
+			MetricQueryServiceRetriever metricQueryServiceRetriever,
+			LeaderElectionService leaderElectionService,
+			FatalErrorHandler fatalErrorHandler) throws Exception {
+		final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(configuration);
+
+		return new DispatcherRestEndpoint(
+			RestServerEndpointConfiguration.fromConfiguration(configuration),
+			dispatcherGatewayRetriever,
+			configuration,
+			restHandlerConfiguration,
+			resourceManagerGatewayRetriever,
+			transientBlobService,
+			executor,
+			metricQueryServiceRetriever,
+			leaderElectionService,
+			fatalErrorHandler);
+	}
+}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
index 482e2b70cb6..42e666eda88 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
@@ -19,48 +19,28 @@
 package org.apache.flink.yarn.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
+import org.apache.flink.runtime.entrypoint.component.JobDispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.yarn.YarnResourceManager;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 
-import javax.annotation.Nullable;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 
 /**
  * Entry point for Yarn per-job clusters.
  */
 public class YarnJobClusterEntrypoint extends JobClusterEntrypoint {
 
-	/** The job graph file path. */
-	public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
-
 	private final String workingDirectory;
 
 	public YarnJobClusterEntrypoint(
@@ -82,58 +62,10 @@ protected String getRPCPortRange(Configuration configuration) {
 	}
 
 	@Override
-	protected ResourceManager<?> createResourceManager(
-			Configuration configuration,
-			ResourceID resourceId,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry,
-			FatalErrorHandler fatalErrorHandler,
-			ClusterInformation clusterInformation,
-			@Nullable String webInterfaceUrl) throws Exception {
-		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-		final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
-			rmServicesConfiguration,
-			highAvailabilityServices,
-			rpcService.getScheduledExecutor());
-
-		return new YarnResourceManager(
-			rpcService,
-			ResourceManager.RESOURCE_MANAGER_NAME,
-			resourceId,
-			configuration,
-			System.getenv(),
-			highAvailabilityServices,
-			heartbeatServices,
-			rmRuntimeServices.getSlotManager(),
-			metricRegistry,
-			rmRuntimeServices.getJobLeaderIdService(),
-			clusterInformation,
-			fatalErrorHandler,
-			webInterfaceUrl);
-	}
-
-	@Override
-	protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException {
-		String jobGraphFile = configuration.getString(JOB_GRAPH_FILE_PATH, "job.graph");
-		File fp = new File(jobGraphFile);
-
-		try (FileInputStream input = new FileInputStream(fp);
-			ObjectInputStream obInput = new ObjectInputStream(input)) {
-
-			return (JobGraph) obInput.readObject();
-		} catch (FileNotFoundException e) {
-			throw new FlinkException("Could not find the JobGraph file.", e);
-		} catch (ClassNotFoundException | IOException e) {
-			throw new FlinkException("Could not load the JobGraph from file.", e);
-		}
-	}
-
-	@Override
-	protected void registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture) {
-		terminationFuture.thenAccept((status) ->
-			shutDownAndTerminate(status.processExitCode(), status, null, true));
+	protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
+		return new JobDispatcherResourceManagerComponentFactory(
+			YarnResourceManagerFactory.INSTANCE,
+			FileJobGraphRetriever.createFrom(configuration));
 	}
 
 	// ------------------------------------------------------------------------
@@ -167,6 +99,6 @@ public static void main(String[] args) {
 			configuration,
 			workingDirectory);
 
-		yarnJobClusterEntrypoint.startCluster();
+		ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
 	}
 }
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
new file mode 100644
index 00000000000..bfd1b4a07ed
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.yarn.YarnResourceManager;
+import org.apache.flink.yarn.YarnWorkerNode;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link ResourceManagerFactory} implementation which creates a {@link YarnResourceManager}.
+ */
+public enum YarnResourceManagerFactory implements ResourceManagerFactory<YarnWorkerNode> {
+	INSTANCE;
+
+	@Override
+	public ResourceManager<YarnWorkerNode> createResourceManager(
+			Configuration configuration,
+			ResourceID resourceId,
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			HeartbeatServices heartbeatServices,
+			MetricRegistry metricRegistry,
+			FatalErrorHandler fatalErrorHandler,
+			ClusterInformation clusterInformation,
+			@Nullable String webInterfaceUrl) throws Exception {
+		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
+		final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
+			rmServicesConfiguration,
+			highAvailabilityServices,
+			rpcService.getScheduledExecutor());
+
+		return new YarnResourceManager(
+			rpcService,
+			ResourceManager.RESOURCE_MANAGER_NAME,
+			resourceId,
+			configuration,
+			System.getenv(),
+			highAvailabilityServices,
+			heartbeatServices,
+			rmRuntimeServices.getSlotManager(),
+			metricRegistry,
+			rmRuntimeServices.getJobLeaderIdService(),
+			clusterInformation,
+			fatalErrorHandler,
+			webInterfaceUrl);
+	}
+}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
index 6c928611910..0f4656e62a4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
@@ -19,29 +19,19 @@
 package org.apache.flink.yarn.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.yarn.YarnResourceManager;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.Map;
 
@@ -70,36 +60,8 @@ protected String getRPCPortRange(Configuration configuration) {
 	}
 
 	@Override
-	protected ResourceManager<?> createResourceManager(
-			Configuration configuration,
-			ResourceID resourceId,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry,
-			FatalErrorHandler fatalErrorHandler,
-			ClusterInformation clusterInformation,
-			@Nullable String webInterfaceUrl) throws Exception {
-		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-		final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
-			rmServicesConfiguration,
-			highAvailabilityServices,
-			rpcService.getScheduledExecutor());
-
-		return new YarnResourceManager(
-			rpcService,
-			ResourceManager.RESOURCE_MANAGER_NAME,
-			resourceId,
-			configuration,
-			System.getenv(),
-			highAvailabilityServices,
-			heartbeatServices,
-			rmRuntimeServices.getSlotManager(),
-			metricRegistry,
-			rmRuntimeServices.getJobLeaderIdService(),
-			clusterInformation,
-			fatalErrorHandler,
-			webInterfaceUrl);
+	protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
+		return new SessionDispatcherResourceManagerComponentFactory(YarnResourceManagerFactory.INSTANCE);
 	}
 
 	public static void main(String[] args) {
@@ -128,6 +90,6 @@ public static void main(String[] args) {
 			configuration,
 			workingDirectory);
 
-		yarnSessionClusterEntrypoint.startCluster();
+		ClusterEntrypoint.runClusterEntrypoint(yarnSessionClusterEntrypoint);
 	}
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message