flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [03/13] flink git commit: [FLINK-8705] [flip6] Add DispatcherRestEndpoint to MiniCluster
Date Wed, 21 Feb 2018 22:56:22 GMT
[FLINK-8705] [flip6] Add DispatcherRestEndpoint to MiniCluster

In order to properly support the RemoteEnvironment, the Flip-6 MiniCluster
needs a REST endpoint to receive requests from the RestClusterClient.

This closes #5527.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2a18f053
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2a18f053
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2a18f053

Branch: refs/heads/master
Commit: 2a18f0532b22edf8b821ef883598f4ee52367711
Parents: facf2ac
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Tue Feb 20 17:48:27 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Wed Feb 21 22:49:05 2018 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/LocalExecutor.java  |  15 +-
 .../org/apache/flink/client/RemoteExecutor.java |   2 +
 .../client/program/rest/RestClusterClient.java  |  18 +-
 .../flink/runtime/jobmaster/JobResult.java      |  39 ++
 .../runtime/minicluster/JobExecutorService.java |  14 +-
 .../flink/runtime/minicluster/MiniCluster.java  | 337 ++++++++----
 .../minicluster/MiniClusterConfiguration.java   |  32 --
 .../minicluster/MiniClusterJobDispatcher.java   | 506 -------------------
 .../minicluster/StandaloneMiniCluster.java      |  14 +-
 .../resourcemanager/ResourceManagerRunner.java  |   4 +
 .../runtime/minicluster/FlinkMiniCluster.scala  |   2 +-
 .../runtime/minicluster/MiniClusterITCase.java  |  31 +-
 .../org/apache/flink/api/scala/FlinkShell.scala |   4 +-
 .../flink/api/scala/ScalaShellITCase.scala      |  26 +-
 .../scala/ScalaShellLocalStartupITCase.scala    |  22 +-
 .../Flip6LocalStreamEnvironment.java            |   5 +
 .../flink/test/util/MiniClusterResource.java    |  13 +-
 .../test/operators/RemoteEnvironmentITCase.java |  50 +-
 .../recovery/ProcessFailureCancelingITCase.java |   6 +-
 19 files changed, 421 insertions(+), 719 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2a18f053/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index 7d71f48..415e33c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.Program;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.dag.DataSinkNode;
@@ -125,14 +125,11 @@ public class LocalExecutor extends PlanExecutor {
 	private JobExecutorService createJobExecutorService(Configuration configuration) throws Exception {
 		final JobExecutorService newJobExecutorService;
 		if (CoreOptions.FLIP6_MODE.equals(configuration.getString(CoreOptions.MODE))) {
+
+			configuration.setInteger(RestOptions.REST_PORT, 0);
+
 			final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
 				.setConfiguration(configuration)
-				.setNumJobManagers(
-					configuration.getInteger(
-						ConfigConstants.LOCAL_NUMBER_JOB_MANAGER,
-						ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER))
-				.setNumResourceManagers(
-					configuration.getInteger(ResourceManagerOptions.LOCAL_NUMBER_RESOURCE_MANAGER))
 				.setNumTaskManagers(
 					configuration.getInteger(
 						ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
@@ -146,6 +143,8 @@ public class LocalExecutor extends PlanExecutor {
 			final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
 			miniCluster.start();
 
+			configuration.setInteger(RestOptions.REST_PORT, miniCluster.getRestAddress().getPort());
+
 			newJobExecutorService = miniCluster;
 		} else {
 			final LocalFlinkMiniCluster localFlinkMiniCluster = new LocalFlinkMiniCluster(configuration, true);
@@ -161,7 +160,7 @@ public class LocalExecutor extends PlanExecutor {
 	public void stop() throws Exception {
 		synchronized (lock) {
 			if (jobExecutorService != null) {
-				jobExecutorService.terminate().get();
+				jobExecutorService.close();
 				jobExecutorService = null;
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a18f053/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index 7551da0..1afa087 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -29,6 +29,7 @@ import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.costs.DefaultCostEstimator;
@@ -112,6 +113,7 @@ public class RemoteExecutor extends PlanExecutor {
 
 		clientConfiguration.setString(JobManagerOptions.ADDRESS, inet.getHostName());
 		clientConfiguration.setInteger(JobManagerOptions.PORT, inet.getPort());
+		clientConfiguration.setInteger(RestOptions.REST_PORT, inet.getPort());
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/2a18f053/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 2f53545..a02d6cf 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -19,10 +19,8 @@
 package org.apache.flink.client.program.rest;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
-import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -78,7 +76,6 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.function.CheckedSupplier;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
@@ -219,20 +216,11 @@ public class RestClusterClient<T> extends ClusterClient<T> {
 				throw new ProgramInvocationException("Could not retrieve the execution result.", ExceptionUtils.stripExecutionException(e));
 			}
 
-			if (jobResult.getSerializedThrowable().isPresent()) {
-				final SerializedThrowable serializedThrowable = jobResult.getSerializedThrowable().get();
-				final Throwable throwable = serializedThrowable.deserializeError(classLoader);
-				throw new ProgramInvocationException(throwable);
-			}
-
 			try {
-				this.lastJobExecutionResult = new JobExecutionResult(
-					jobResult.getJobId(),
-					jobResult.getNetRuntime(),
-					AccumulatorHelper.deserializeAccumulators(
-						jobResult.getAccumulatorResults(),
-						classLoader));
+				this.lastJobExecutionResult = jobResult.toJobExecutionResult(classLoader);
 				return lastJobExecutionResult;
+			} catch (JobResult.WrappedJobException we) {
+				throw new ProgramInvocationException(we.getCause());
 			} catch (IOException | ClassNotFoundException e) {
 				throw new ProgramInvocationException(e);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a18f053/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
index 5a5e713..28fbc30 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
@@ -19,16 +19,20 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.SerializedValue;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.Map;
@@ -99,6 +103,29 @@ public class JobResult implements Serializable {
 	}
 
 	/**
+	 * Converts the {@link JobResult} to a {@link JobExecutionResult}.
+	 *
+	 * @param classLoader to use for deserialization
+	 * @return JobExecutionResult
+	 * @throws WrappedJobException if the JobResult contains a serialized exception
+	 * @throws IOException if the accumulator could not be deserialized
+	 * @throws ClassNotFoundException if the accumulator could not deserialized
+	 */
+	public JobExecutionResult toJobExecutionResult(ClassLoader classLoader) throws WrappedJobException, IOException, ClassNotFoundException {
+		if (serializedThrowable != null) {
+			final Throwable throwable = serializedThrowable.deserializeError(classLoader);
+			throw new WrappedJobException(throwable);
+		}
+
+		return new JobExecutionResult(
+			jobId,
+			netRuntime,
+			AccumulatorHelper.deserializeAccumulators(
+				accumulatorResults,
+				classLoader));
+	}
+
+	/**
 	 * Builder for {@link JobResult}.
 	 */
 	@Internal
@@ -175,4 +202,16 @@ public class JobResult implements Serializable {
 		return builder.build();
 	}
 
+	/**
+	 * Exception which indicates that the job has finished with an {@link Exception}.
+	 */
+	public static final class WrappedJobException extends FlinkException {
+
+		private static final long serialVersionUID = 6535061898650156019L;
+
+		public WrappedJobException(Throwable cause) {
+			super(cause);
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a18f053/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/JobExecutorService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/JobExecutorService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/JobExecutorService.java
index 03d2447..13bdf46 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/JobExecutorService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/JobExecutorService.java
@@ -18,20 +18,10 @@
 
 package org.apache.flink.runtime.minicluster;
 
-import java.util.concurrent.CompletableFuture;
+import org.apache.flink.util.AutoCloseableAsync;
 
 /**
  * Interface to control {@link JobExecutor}.
  */
-public interface JobExecutorService extends JobExecutor {
-
-	/**
-	 * Terminate the given JobExecutorService.
-	 *
-	 * <p>This method can be implemented asynchronously. Therefore it returns a future
-	 * which is completed once the termination has been done.
-	 *
-	 * @return Termination future which can also contain an exception if the termination went wrong
-	 */
-	CompletableFuture<?> terminate();
+public interface JobExecutorService extends JobExecutor, AutoCloseableAsync {
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a18f053/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 7692ddf..5046ae7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.BlobServer;
@@ -29,24 +30,39 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderelection.LeaderAddressAndId;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
+import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
 
 import akka.actor.ActorSystem;
@@ -55,19 +71,21 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.GuardedBy;
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.URI;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
-import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Flip-6 based MiniCluster.
  */
-public class MiniCluster implements JobExecutorService {
+public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 
 	private static final Logger LOG = LoggerFactory.getLogger(MiniCluster.class);
 
@@ -77,6 +95,8 @@ public class MiniCluster implements JobExecutorService {
 	/** The configuration for this mini cluster. */
 	private final MiniClusterConfiguration miniClusterConfiguration;
 
+	private final Time rpcTimeout;
+
 	@GuardedBy("lock")
 	private MetricRegistryImpl metricRegistry;
 
@@ -84,13 +104,13 @@ public class MiniCluster implements JobExecutorService {
 	private RpcService commonRpcService;
 
 	@GuardedBy("lock")
-	private RpcService[] jobManagerRpcServices;
+	private RpcService jobManagerRpcService;
 
 	@GuardedBy("lock")
 	private RpcService[] taskManagerRpcServices;
 
 	@GuardedBy("lock")
-	private RpcService[] resourceManagerRpcServices;
+	private RpcService resourceManagerRpcService;
 
 	@GuardedBy("lock")
 	private HighAvailabilityServices haServices;
@@ -105,12 +125,27 @@ public class MiniCluster implements JobExecutorService {
 	private BlobCacheService blobCacheService;
 
 	@GuardedBy("lock")
-	private ResourceManagerRunner[] resourceManagerRunners;
+	private ResourceManagerRunner resourceManagerRunner;
 
 	private volatile TaskExecutor[] taskManagers;
 
 	@GuardedBy("lock")
-	private MiniClusterJobDispatcher jobDispatcher;
+	private DispatcherRestEndpoint dispatcherRestEndpoint;
+
+	@GuardedBy("lock")
+	private URI restAddressURI;
+
+	@GuardedBy("lock")
+	private LeaderRetrievalService resourceManagerLeaderRetriever;
+
+	@GuardedBy("lock")
+	private LeaderRetrievalService dispatcherLeaderRetriever;
+
+	@GuardedBy("lock")
+	private StandaloneDispatcher dispatcher;
+
+	@GuardedBy("lock")
+	private RpcGatewayRetriever<DispatcherId, DispatcherGateway> dispatcherGatewayRetriever;
 
 	/** Flag marking the mini cluster as started/running. */
 	private volatile boolean running;
@@ -125,9 +160,17 @@ public class MiniCluster implements JobExecutorService {
 	public MiniCluster(MiniClusterConfiguration miniClusterConfiguration) {
 		this.miniClusterConfiguration = checkNotNull(miniClusterConfiguration, "config may not be null");
 
+		this.rpcTimeout = Time.seconds(10L);
 		running = false;
 	}
 
+	public URI getRestAddress() {
+		synchronized (lock) {
+			checkState(running, "MiniCluster is not yet running.");
+			return restAddressURI;
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  life cycle
 	// ------------------------------------------------------------------------
@@ -154,9 +197,7 @@ public class MiniCluster implements JobExecutorService {
 
 			final Configuration configuration = miniClusterConfiguration.getConfiguration();
 			final Time rpcTimeout = miniClusterConfiguration.getRpcTimeout();
-			final int numJobManagers = miniClusterConfiguration.getNumJobManagers();
 			final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();
-			final int numResourceManagers = miniClusterConfiguration.getNumResourceManagers();
 			final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == MiniClusterConfiguration.RpcServiceSharing.SHARED;
 
 			try {
@@ -165,9 +206,9 @@ public class MiniCluster implements JobExecutorService {
 				LOG.info("Starting Metrics Registry");
 				metricRegistry = createMetricRegistry(configuration);
 
-				RpcService[] jobManagerRpcServices = new RpcService[numJobManagers];
-				RpcService[] taskManagerRpcServices = new RpcService[numTaskManagers];
-				RpcService[] resourceManagerRpcServices = new RpcService[numResourceManagers];
+				final RpcService jobManagerRpcService;
+				final RpcService resourceManagerRpcService;
+				final RpcService[] taskManagerRpcServices = new RpcService[numTaskManagers];
 
 				// bring up all the RPC services
 				LOG.info("Starting RPC Service(s)");
@@ -180,19 +221,15 @@ public class MiniCluster implements JobExecutorService {
 				metricRegistry.startQueryService(actorSystem, null);
 
 				if (useSingleRpcService) {
-					// set that same RPC service for all JobManagers and TaskManagers
-					for (int i = 0; i < numJobManagers; i++) {
-						jobManagerRpcServices[i] = commonRpcService;
-					}
 					for (int i = 0; i < numTaskManagers; i++) {
 						taskManagerRpcServices[i] = commonRpcService;
 					}
-					for (int i = 0; i < numResourceManagers; i++) {
-						resourceManagerRpcServices[i] = commonRpcService;
-					}
 
-					this.resourceManagerRpcServices = null;
-					this.jobManagerRpcServices = null;
+					jobManagerRpcService = commonRpcService;
+					resourceManagerRpcService = commonRpcService;
+
+					this.resourceManagerRpcService = null;
+					this.jobManagerRpcService = null;
 					this.taskManagerRpcServices = null;
 				}
 				else {
@@ -201,24 +238,17 @@ public class MiniCluster implements JobExecutorService {
 					final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress();
 					final String resourceManagerBindAddress = miniClusterConfiguration.getResourceManagerBindAddress();
 
-					for (int i = 0; i < numJobManagers; i++) {
-						jobManagerRpcServices[i] = createRpcService(
-								configuration, rpcTimeout, true, jobManagerBindAddress);
-					}
+					jobManagerRpcService = createRpcService(configuration, rpcTimeout, true, jobManagerBindAddress);
+					resourceManagerRpcService = createRpcService(configuration, rpcTimeout, true, resourceManagerBindAddress);
 
 					for (int i = 0; i < numTaskManagers; i++) {
 						taskManagerRpcServices[i] = createRpcService(
 								configuration, rpcTimeout, true, taskManagerBindAddress);
 					}
 
-					for (int i = 0; i < numResourceManagers; i++) {
-						resourceManagerRpcServices[i] = createRpcService(
-								configuration, rpcTimeout, true, resourceManagerBindAddress);
-					}
-
-					this.jobManagerRpcServices = jobManagerRpcServices;
+					this.jobManagerRpcService = jobManagerRpcService;
 					this.taskManagerRpcServices = taskManagerRpcServices;
-					this.resourceManagerRpcServices = resourceManagerRpcServices;
+					this.resourceManagerRpcService = resourceManagerRpcService;
 				}
 
 				// create the high-availability services
@@ -233,14 +263,13 @@ public class MiniCluster implements JobExecutorService {
 				heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
 
 				// bring up the ResourceManager(s)
-				LOG.info("Starting {} ResourceManger(s)", numResourceManagers);
-				resourceManagerRunners = startResourceManagers(
+				LOG.info("Starting ResourceManger");
+				resourceManagerRunner = startResourceManager(
 					configuration,
 					haServices,
 					heartbeatServices,
 					metricRegistry,
-					numResourceManagers,
-					resourceManagerRpcServices,
+					resourceManagerRpcService,
 					new ClusterInformation("localhost", blobServer.getPort()));
 
 				blobCacheService = new BlobCacheService(
@@ -258,16 +287,64 @@ public class MiniCluster implements JobExecutorService {
 					numTaskManagers,
 					taskManagerRpcServices);
 
+				// starting the dispatcher rest endpoint
+				LOG.info("Starting dispatcher rest endpoint.");
+
+				dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
+					jobManagerRpcService,
+					DispatcherGateway.class,
+					DispatcherId::new,
+					20,
+					Time.milliseconds(20L));
+				final RpcGatewayRetriever<ResourceManagerId, ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
+					jobManagerRpcService,
+					ResourceManagerGateway.class,
+					ResourceManagerId::new,
+					20,
+					Time.milliseconds(20L));
+
+				this.dispatcherRestEndpoint = new DispatcherRestEndpoint(
+					RestServerEndpointConfiguration.fromConfiguration(configuration),
+					dispatcherGatewayRetriever,
+					configuration,
+					RestHandlerConfiguration.fromConfiguration(configuration),
+					resourceManagerGatewayRetriever,
+					blobServer.getTransientBlobService(),
+					commonRpcService.getExecutor(),
+					new AkkaQueryServiceRetriever(
+						actorSystem,
+						Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
+					haServices.getWebMonitorLeaderElectionService(),
+					new ShutDownFatalErrorHandler());
+
+				dispatcherRestEndpoint.start();
+
+				restAddressURI = new URI(dispatcherRestEndpoint.getRestAddress());
+
 				// bring up the dispatcher that launches JobManagers when jobs submitted
-				LOG.info("Starting job dispatcher(s) for {} JobManger(s)", numJobManagers);
-				jobDispatcher = new MiniClusterJobDispatcher(
+				LOG.info("Starting job dispatcher(s) for JobManger");
+
+				dispatcher = new StandaloneDispatcher(
+					jobManagerRpcService,
+					Dispatcher.DISPATCHER_NAME + UUID.randomUUID(),
 					configuration,
 					haServices,
+					resourceManagerRunner.getResourceManageGateway(),
 					blobServer,
 					heartbeatServices,
 					metricRegistry,
-					numJobManagers,
-					jobManagerRpcServices);
+					new MemoryArchivedExecutionGraphStore(),
+					Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
+					new ShutDownFatalErrorHandler(),
+					dispatcherRestEndpoint.getRestAddress());
+
+				dispatcher.start();
+
+				resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();
+				dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever();
+
+				resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);
+				dispatcherLeaderRetriever.start(dispatcherGatewayRetriever);
 			}
 			catch (Exception e) {
 				// cleanup everything
@@ -319,26 +396,51 @@ public class MiniCluster implements JobExecutorService {
 		Throwable exception = null;
 
 		// cancel all jobs and shut down the job dispatcher
-		if (jobDispatcher != null) {
+		if (dispatcher != null) {
 			try {
-				jobDispatcher.shutdown();
+				RpcUtils.terminateRpcEndpoint(dispatcher, rpcTimeout);
 			} catch (Exception e) {
 				exception = e;
 			}
-			jobDispatcher = null;
+			dispatcher = null;
 		}
 
-		if (resourceManagerRunners != null) {
-			for (ResourceManagerRunner rm : resourceManagerRunners) {
-				if (rm != null) {
-					try {
-						rm.shutDown();
-					} catch (Throwable t) {
-						exception = firstOrSuppressed(t, exception);
-					}
-				}
+		if (dispatcherRestEndpoint != null) {
+			try {
+				dispatcherRestEndpoint.shutDownAsync().get();
+			} catch (Exception e) {
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
+			}
+
+			dispatcherRestEndpoint = null;
+		}
+
+		if (resourceManagerLeaderRetriever != null) {
+			try {
+				resourceManagerLeaderRetriever.stop();
+			} catch (Exception e) {
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
+			}
+
+			resourceManagerLeaderRetriever = null;
+		}
+
+		if (dispatcherLeaderRetriever != null) {
+			try {
+				dispatcherLeaderRetriever.stop();
+			} catch (Exception e) {
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
+			}
+
+			dispatcherLeaderRetriever = null;
+		}
+
+		if (resourceManagerRunner != null) {
+			try {
+				resourceManagerRunner.shutDown();
+			} catch (Throwable t) {
+				exception = ExceptionUtils.firstOrSuppressed(t, exception);
 			}
-			resourceManagerRunners = null;
 		}
 
 		if (taskManagers != null) {
@@ -349,7 +451,7 @@ public class MiniCluster implements JobExecutorService {
 						// wait for the TaskManager to properly terminate
 						tm.getTerminationFuture().get();
 					} catch (Throwable t) {
-						exception = firstOrSuppressed(t, exception);
+						exception = ExceptionUtils.firstOrSuppressed(t, exception);
 					}
 				}
 			}
@@ -363,30 +465,29 @@ public class MiniCluster implements JobExecutorService {
 
 		// shut down the RpcServices
 		exception = shutDownRpc(commonRpcService, exception);
-		exception = shutDownRpcs(jobManagerRpcServices, exception);
+		exception = shutDownRpc(jobManagerRpcService, exception);
 		exception = shutDownRpcs(taskManagerRpcServices, exception);
-		exception = shutDownRpcs(resourceManagerRpcServices, exception);
+		exception = shutDownRpc(resourceManagerRpcService, exception);
 		commonRpcService = null;
-		jobManagerRpcServices = null;
+		jobManagerRpcService = null;
 		taskManagerRpcServices = null;
-		resourceManagerRpcServices = null;
+		resourceManagerRpcService = null;
 
 		if (blobCacheService != null) {
 			try {
 				blobCacheService.close();
 			} catch (Exception e) {
-				exception = firstOrSuppressed(e, exception);
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
 			}
 			blobCacheService = null;
 		}
 
-
 		// shut down the blob server
 		if (blobServer != null) {
 			try {
 				blobServer.close();
 			} catch (Exception e) {
-				exception = firstOrSuppressed(e, exception);
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
 			}
 			blobServer = null;
 		}
@@ -396,7 +497,7 @@ public class MiniCluster implements JobExecutorService {
 			try {
 				haServices.closeAndCleanupAllData();
 			} catch (Exception e) {
-				exception = firstOrSuppressed(e, exception);
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
 			}
 			haServices = null;
 		}
@@ -466,12 +567,26 @@ public class MiniCluster implements JobExecutorService {
 	 * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
 	 *         or if the job terminally failed.
 	 */
-	public void runDetached(JobGraph job) throws JobExecutionException {
+	public void runDetached(JobGraph job) throws JobExecutionException, InterruptedException {
 		checkNotNull(job, "job is null");
 
-		synchronized (lock) {
-			checkState(running, "mini cluster is not running");
-			jobDispatcher.runDetached(job);
+		final DispatcherGateway currentDispatcherGateway;
+		try {
+			currentDispatcherGateway = getDispatcherGateway();
+		} catch (LeaderRetrievalException e) {
+			throw new JobExecutionException(job.getJobID(), e);
+		}
+
+		// we have to allow queued scheduling in Flip-6 mode because we need to request slots
+		// from the ResourceManager
+		job.setAllowQueuedScheduling(true);
+
+		final CompletableFuture<Acknowledge> submissionFuture = currentDispatcherGateway.submitJob(job, rpcTimeout);
+
+		try {
+			submissionFuture.get();
+		} catch (ExecutionException e) {
+			throw new JobExecutionException(job.getJobID(), ExceptionUtils.stripExecutionException(e));
 		}
 	}
 
@@ -489,17 +604,48 @@ public class MiniCluster implements JobExecutorService {
 	public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
 		checkNotNull(job, "job is null");
 
-		MiniClusterJobDispatcher dispatcher;
-		synchronized (lock) {
-			checkState(running, "mini cluster is not running");
-			dispatcher = this.jobDispatcher;
+		final DispatcherGateway currentDispatcherGateway;
+		try {
+			currentDispatcherGateway = getDispatcherGateway();
+		} catch (LeaderRetrievalException e) {
+			throw new JobExecutionException(job.getJobID(), e);
 		}
 
 		// we have to allow queued scheduling in Flip-6 mode because we need to request slots
 		// from the ResourceManager
 		job.setAllowQueuedScheduling(true);
 
-		return dispatcher.runJobBlocking(job);
+		final CompletableFuture<Acknowledge> submissionFuture = currentDispatcherGateway.submitJob(job, rpcTimeout);
+
+		final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose(
+			(Acknowledge ack) -> currentDispatcherGateway.requestJobResult(job.getJobID(), RpcUtils.INF_TIMEOUT));
+
+		final JobResult jobResult;
+
+		try {
+			jobResult = jobResultFuture.get();
+		} catch (ExecutionException e) {
+			throw new JobExecutionException(job.getJobID(), "Could not retrieve JobResult.", ExceptionUtils.stripExecutionException(e));
+		}
+
+		try {
+			return jobResult.toJobExecutionResult(Thread.currentThread().getContextClassLoader());
+		} catch (JobResult.WrappedJobException e) {
+			throw new JobExecutionException(job.getJobID(), e.getCause());
+		} catch (IOException | ClassNotFoundException e) {
+			throw new JobExecutionException(job.getJobID(), e);
+		}
+	}
+
+	private DispatcherGateway getDispatcherGateway() throws LeaderRetrievalException, InterruptedException {
+		synchronized (lock) {
+			checkState(running, "MiniCluster is not yet running.");
+			try {
+				return dispatcherGatewayRetriever.getFuture().get();
+			} catch (ExecutionException e) {
+				throw new LeaderRetrievalException("Could not retrieve the leading dispatcher.", ExceptionUtils.stripExecutionException(e));
+			}
+		}
 	}
 
 	// ------------------------------------------------------------------------
@@ -545,33 +691,27 @@ public class MiniCluster implements JobExecutorService {
 		return new AkkaRpcService(actorSystem, askTimeout);
 	}
 
-	protected ResourceManagerRunner[] startResourceManagers(
+	protected ResourceManagerRunner startResourceManager(
 			Configuration configuration,
 			HighAvailabilityServices haServices,
 			HeartbeatServices heartbeatServices,
 			MetricRegistry metricRegistry,
-			int numResourceManagers,
-			RpcService[] resourceManagerRpcServices,
+			RpcService resourceManagerRpcService,
 			ClusterInformation clusterInformation) throws Exception {
 
-		final ResourceManagerRunner[] resourceManagerRunners = new ResourceManagerRunner[numResourceManagers];
-
-		for (int i = 0; i < numResourceManagers; i++) {
+		final ResourceManagerRunner resourceManagerRunner = new ResourceManagerRunner(
+			ResourceID.generate(),
+			FlinkResourceManager.RESOURCE_MANAGER_NAME + '_' + UUID.randomUUID(),
+			configuration,
+			resourceManagerRpcService,
+			haServices,
+			heartbeatServices,
+			metricRegistry,
+			clusterInformation);
 
-			resourceManagerRunners[i] = new ResourceManagerRunner(
-				ResourceID.generate(),
-				FlinkResourceManager.RESOURCE_MANAGER_NAME + '_' + i,
-				configuration,
-				resourceManagerRpcServices[i],
-				haServices,
-				heartbeatServices,
-				metricRegistry,
-				clusterInformation);
+			resourceManagerRunner.start();
 
-			resourceManagerRunners[i].start();
-		}
-
-		return resourceManagerRunners;
+		return resourceManagerRunner;
 	}
 
 	protected TaskExecutor[] startTaskManagers(
@@ -619,7 +759,7 @@ public class MiniCluster implements JobExecutorService {
 				rpcService.stopService();
 			}
 			catch (Throwable t) {
-				return firstOrSuppressed(t, priorException);
+				return ExceptionUtils.firstOrSuppressed(t, priorException);
 			}
 		}
 
@@ -637,7 +777,7 @@ public class MiniCluster implements JobExecutorService {
 					}
 				}
 				catch (Throwable t) {
-					exception = firstOrSuppressed(t, exception);
+					exception = ExceptionUtils.firstOrSuppressed(t, exception);
 				}
 			}
 		}
@@ -645,7 +785,7 @@ public class MiniCluster implements JobExecutorService {
 	}
 
 	@Override
-	public CompletableFuture<?> terminate() {
+	public CompletableFuture<Void> closeAsync() {
 		try {
 			shutdown();
 			return CompletableFuture.completedFuture(null);
@@ -679,4 +819,17 @@ public class MiniCluster implements JobExecutorService {
 			}
 		}
 	}
+
+	private class ShutDownFatalErrorHandler implements FatalErrorHandler {
+
+		@Override
+		public void onFatalError(Throwable exception) {
+			LOG.warn("Error in MiniCluster. Shutting the MiniCluster down.", exception);
+			try {
+				shutdown();
+			} catch (Exception e) {
+				LOG.warn("Could not shut down the MiniCluster.", e);
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a18f053/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index 52e037c..08af0c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -39,12 +39,8 @@ public class MiniClusterConfiguration {
 
 	private final UnmodifiableConfiguration configuration;
 
-	private final int numJobManagers;
-
 	private final int numTaskManagers;
 
-	private final int numResourceManagers;
-
 	private final RpcServiceSharing rpcServiceSharing;
 
 	@Nullable
@@ -56,16 +52,12 @@ public class MiniClusterConfiguration {
 
 	public MiniClusterConfiguration(
 			Configuration configuration,
-			int numJobManagers,
 			int numTaskManagers,
-			int numResourceManagers,
 			RpcServiceSharing rpcServiceSharing,
 			@Nullable String commonBindAddress) {
 
 		this.configuration = new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration));
-		this.numJobManagers = numJobManagers;
 		this.numTaskManagers = numTaskManagers;
-		this.numResourceManagers = numResourceManagers;
 		this.rpcServiceSharing = Preconditions.checkNotNull(rpcServiceSharing);
 		this.commonBindAddress = commonBindAddress;
 	}
@@ -78,18 +70,10 @@ public class MiniClusterConfiguration {
 		return rpcServiceSharing;
 	}
 
-	public int getNumJobManagers() {
-		return numJobManagers;
-	}
-
 	public int getNumTaskManagers() {
 		return numTaskManagers;
 	}
 
-	public int getNumResourceManagers() {
-		return numResourceManagers;
-	}
-
 	public String getJobManagerBindAddress() {
 		return commonBindAddress != null ?
 				commonBindAddress :
@@ -121,9 +105,7 @@ public class MiniClusterConfiguration {
 	public String toString() {
 		return "MiniClusterConfiguration {" +
 				"singleRpcService=" + rpcServiceSharing +
-				", numJobManagers=" + numJobManagers +
 				", numTaskManagers=" + numTaskManagers +
-				", numResourceManagers=" + numResourceManagers +
 				", commonBindAddress='" + commonBindAddress + '\'' +
 				", config=" + configuration +
 				'}';
@@ -151,10 +133,8 @@ public class MiniClusterConfiguration {
 	 */
 	public static class Builder {
 		private Configuration configuration = new Configuration();
-		private int numJobManagers = 1;
 		private int numTaskManagers = 1;
 		private int numSlotsPerTaskManager = 1;
-		private int numResourceManagers = 1;
 		private RpcServiceSharing rpcServiceSharing = SHARED;
 		@Nullable
 		private String commonBindAddress = null;
@@ -164,11 +144,6 @@ public class MiniClusterConfiguration {
 			return this;
 		}
 
-		public Builder setNumJobManagers(int numJobManagers) {
-			this.numJobManagers = numJobManagers;
-			return this;
-		}
-
 		public Builder setNumTaskManagers(int numTaskManagers) {
 			this.numTaskManagers = numTaskManagers;
 			return this;
@@ -179,11 +154,6 @@ public class MiniClusterConfiguration {
 			return this;
 		}
 
-		public Builder setNumResourceManagers(int numResourceManagers) {
-			this.numResourceManagers = numResourceManagers;
-			return this;
-		}
-
 		public Builder setRpcServiceSharing(RpcServiceSharing rpcServiceSharing) {
 			this.rpcServiceSharing = Preconditions.checkNotNull(rpcServiceSharing);
 			return this;
@@ -200,9 +170,7 @@ public class MiniClusterConfiguration {
 
 			return new MiniClusterConfiguration(
 				modifiedConfiguration,
-				numJobManagers,
 				numTaskManagers,
-				numResourceManagers,
 				rpcServiceSharing,
 				commonBindAddress);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a18f053/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
deleted file mode 100644
index ef7a6e4..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ /dev/null
@@ -1,506 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.minicluster;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.accumulators.AccumulatorHelper;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmanager.OnCompletionActions;
-import org.apache.flink.runtime.jobmaster.JobManagerRunner;
-import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
-import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
-import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-/**
- * The dispatcher that runs in the mini cluster, waits for jobs, and starts job masters
- * upon receiving jobs.
- */
-public class MiniClusterJobDispatcher {
-
-	private static final Logger LOG = LoggerFactory.getLogger(MiniClusterJobDispatcher.class);
-
-	// ------------------------------------------------------------------------
-
-	/** lock to ensure that this dispatcher executes only one job at a time. */
-	private final Object lock = new Object();
-
-	/** the configuration with which the mini cluster was started. */
-	private final Configuration configuration;
-
-	/** the RPC services to use by the job managers. */
-	private final RpcService[] rpcServices;
-
-	/** services for discovery, leader election, and recovery. */
-	private final HighAvailabilityServices haServices;
-
-	/** services for heartbeating. */
-	private final HeartbeatServices heartbeatServices;
-
-	/** BlobServer for storing blobs. */
-	private final BlobServer blobServer;
-
-	/** all the services that the JobManager needs, such as BLOB service, factories, etc. */
-	private final JobManagerSharedServices jobManagerSharedServices;
-
-	/** Registry for all metrics in the mini cluster. */
-	private final MetricRegistry metricRegistry;
-
-	/** The number of JobManagers to launch (more than one simulates a high-availability setup). */
-	private final int numJobManagers;
-
-	/** The runner for the job and master. non-null if a job is currently running. */
-	private volatile JobManagerRunner[] runners;
-
-	/** flag marking the dispatcher as hut down. */
-	private volatile boolean shutdown;
-
-
-	/**
-	 * Starts a mini cluster job dispatcher.
-	 *
-	 * <p>The dispatcher kicks off one JobManager per job, a behavior similar to a
-	 * non-highly-available setup.
-	 *
-	 * @param config The configuration of the mini cluster
-	 * @param haServices Access to the discovery, leader election, and recovery services
-	 *
-	 * @throws Exception Thrown, if the services for the JobMaster could not be started.
-	 */
-	public MiniClusterJobDispatcher(
-			Configuration config,
-			RpcService rpcService,
-			HighAvailabilityServices haServices,
-			BlobServer blobServer,
-			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry) throws Exception {
-		this(
-			config,
-			haServices,
-			blobServer,
-			heartbeatServices,
-			metricRegistry,
-			1,
-			new RpcService[] { rpcService });
-	}
-
-	/**
-	 * Starts a mini cluster job dispatcher.
-	 *
-	 * <p>The dispatcher may kick off more than one JobManager per job, thus simulating
-	 * a highly-available setup.
-	 *
-	 * @param config The configuration of the mini cluster
-	 * @param haServices Access to the discovery, leader election, and recovery services
-	 * @param numJobManagers The number of JobMasters to start for each job.
-	 *
-	 * @throws Exception Thrown, if the services for the JobMaster could not be started.
-	 */
-	public MiniClusterJobDispatcher(
-			Configuration config,
-			HighAvailabilityServices haServices,
-			BlobServer blobServer,
-			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry,
-			int numJobManagers,
-			RpcService[] rpcServices) throws Exception {
-
-		checkArgument(numJobManagers >= 1);
-		checkArgument(rpcServices.length == numJobManagers);
-
-		this.configuration = checkNotNull(config);
-		this.rpcServices = rpcServices;
-		this.haServices = checkNotNull(haServices);
-		this.heartbeatServices = checkNotNull(heartbeatServices);
-		this.blobServer = checkNotNull(blobServer);
-		this.metricRegistry = checkNotNull(metricRegistry);
-		this.numJobManagers = numJobManagers;
-
-		LOG.info("Creating JobMaster services");
-		this.jobManagerSharedServices = JobManagerSharedServices.fromConfiguration(config, blobServer);
-	}
-
-	// ------------------------------------------------------------------------
-	//  life cycle
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Shuts down the mini cluster dispatcher. If a job is currently running, that job will be
-	 * terminally failed.
-	 */
-	public void shutdown() throws Exception {
-		synchronized (lock) {
-			if (!shutdown) {
-				shutdown = true;
-
-				LOG.info("Shutting down the job dispatcher");
-
-				// in this shutdown code we copy the references to the stack first,
-				// to avoid concurrent modification
-
-				Throwable exception = null;
-
-				JobManagerRunner[] runners = this.runners;
-				if (runners != null) {
-					this.runners = null;
-
-					for (JobManagerRunner runner : runners) {
-						try {
-							runner.shutdown();
-						} catch (Throwable e) {
-							exception = ExceptionUtils.firstOrSuppressed(e, exception);
-						}
-					}
-				}
-
-				// shut down the JobManagerSharedServices
-				try {
-					jobManagerSharedServices.shutdown();
-				} catch (Throwable throwable) {
-					exception = ExceptionUtils.firstOrSuppressed(throwable, exception);
-				}
-
-				if (exception != null) {
-					throw new FlinkException("Could not properly terminate all JobManagerRunners.", exception);
-				}
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  submitting jobs
-	// ------------------------------------------------------------------------
-
-	/**
-	 * This method executes a job in detached mode. The method returns immediately after the job
-	 * has been added to the
-	 *
-	 * @param job  The Flink job to execute
-	 *
-	 * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
-	 *         or if the job terminally failed.
-	 */
-	public void runDetached(JobGraph job) throws JobExecutionException {
-		checkNotNull(job);
-
-		LOG.info("Received job for detached execution: {} ({})", job.getName(), job.getJobID());
-
-		synchronized (lock) {
-			checkState(!shutdown, "mini cluster is shut down");
-			checkState(runners == null, "mini cluster can only execute one job at a time");
-
-			DetachedFinalizer finalizer = new DetachedFinalizer(job.getJobID(), numJobManagers);
-
-			this.runners = startJobRunners(job, finalizer, finalizer);
-		}
-	}
-
-	/**
-	 * This method runs a job in blocking mode. The method returns only after the job
-	 * completed successfully, or after it failed terminally.
-	 *
-	 * @param job  The Flink job to execute
-	 * @return The result of the job execution
-	 *
-	 * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
-	 *         or if the job terminally failed.
-	 */
-	public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
-		checkNotNull(job);
-
-		LOG.info("Received job for blocking execution: {} ({})", job.getName(), job.getJobID());
-		final BlockingJobSync sync = new BlockingJobSync(job.getJobID(), numJobManagers);
-
-		synchronized (lock) {
-			checkState(!shutdown, "mini cluster is shut down");
-			checkState(runners == null, "mini cluster can only execute one job at a time");
-
-			this.runners = startJobRunners(job, sync, sync);
-		}
-
-		try {
-			return sync.getResult();
-		}
-		finally {
-			// always clear the status for the next job
-			runners = null;
-			clearJobRunningState(job.getJobID());
-		}
-	}
-
-	private JobManagerRunner[] startJobRunners(
-			JobGraph job,
-			OnCompletionActions onCompletion,
-			FatalErrorHandler errorHandler) throws JobExecutionException {
-
-		LOG.info("Starting {} JobMaster(s) for job {} ({})", numJobManagers, job.getName(), job.getJobID());
-
-		// start all JobManagers
-		JobManagerRunner[] runners = new JobManagerRunner[numJobManagers];
-		for (int i = 0; i < numJobManagers; i++) {
-			try {
-				runners[i] = new JobManagerRunner(
-					ResourceID.generate(),
-					job,
-					configuration,
-					rpcServices[i],
-					haServices,
-					heartbeatServices,
-					blobServer,
-					jobManagerSharedServices,
-					metricRegistry,
-					null);
-
-				final int index = i;
-
-				runners[i].getResultFuture()
-					.whenComplete(
-						(ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> {
-							try {
-								runners[index].shutdown();
-							} catch (Exception e) {
-								errorHandler.onFatalError(e);
-							}
-
-							if (archivedExecutionGraph != null) {
-								onCompletion.jobReachedGloballyTerminalState(archivedExecutionGraph);
-							} else {
-								final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
-
-								if (strippedThrowable instanceof JobNotFinishedException) {
-									onCompletion.jobFinishedByOther();
-								} else {
-									errorHandler.onFatalError(strippedThrowable);
-								}
-							}
-						});
-
-				runners[i].start();
-			}
-			catch (Throwable t) {
-				// shut down all the ones so far
-				for (int k = 0; k <= i; k++) {
-					try {
-						if (runners[i] != null) {
-							runners[i].shutdown();
-						}
-					} catch (Throwable ignored) {
-						// silent shutdown
-					}
-				}
-
-				// un-register the job from the high.availability services
-				try {
-					haServices.getRunningJobsRegistry().setJobFinished(job.getJobID());
-				}
-				catch (Throwable tt) {
-					LOG.warn("Could not properly unregister job from high-availability services", tt);
-				}
-
-				throw new JobExecutionException(job.getJobID(), "Could not start the JobManager(s) for the job", t);
-			}
-		}
-
-		return runners;
-	}
-
-	private void clearJobRunningState(JobID jobID) {
-		// we mark the job as finished in the HA services, so need
-		// to remove the data after job finished
-		try {
-			haServices.getRunningJobsRegistry().clearJob(jobID);
-
-			// TODO: Remove job data from BlobServer
-		}
-		catch (Throwable t) {
-			LOG.warn("Could not clear job {} at the status registry of the high-availability services", jobID, t);
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  test methods to simulate job master failures
-	// ------------------------------------------------------------------------
-
-//	public void killJobMaster(int which) {
-//		checkArgument(which >= 0 && which < numJobManagers, "no such job master");
-//		checkState(!shutdown, "mini cluster is shut down");
-//
-//		JobManagerRunner[] runners = this.runners;
-//		checkState(runners != null, "mini cluster it not executing a job right now");
-//
-//		runners[which].shutdown(new Throwable("kill JobManager"));
-//	}
-
-	// ------------------------------------------------------------------------
-	//  utility classes
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Simple class that waits for all runners to have reported that they are done.
-	 * In the case of a high-availability test setup, there may be multiple runners.
-	 * After that, it marks the mini cluster as ready to receive new jobs.
-	 */
-	private class DetachedFinalizer implements OnCompletionActions, FatalErrorHandler {
-
-		private final JobID jobID;
-
-		private final AtomicInteger numJobManagersToWaitFor;
-
-		private DetachedFinalizer(JobID jobID, int numJobManagersToWaitFor) {
-			this.jobID = jobID;
-			this.numJobManagersToWaitFor = new AtomicInteger(numJobManagersToWaitFor);
-		}
-
-		@Override
-		public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) {
-			decrementCheckAndCleanup();
-		}
-
-		@Override
-		public void jobFinishedByOther() {
-			decrementCheckAndCleanup();
-		}
-
-		@Override
-		public void onFatalError(Throwable exception) {
-			decrementCheckAndCleanup();
-		}
-
-		private void decrementCheckAndCleanup() {
-			if (numJobManagersToWaitFor.decrementAndGet() == 0) {
-				MiniClusterJobDispatcher.this.runners = null;
-				MiniClusterJobDispatcher.this.clearJobRunningState(jobID);
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * This class is used to sync on blocking jobs across multiple runners.
-	 * Only after all runners reported back that they are finished, the
-	 * result will be released.
-	 *
-	 * <p>That way it is guaranteed that after the blocking job submit call returns,
-	 * the dispatcher is immediately free to accept another job.
-	 */
-	private static class BlockingJobSync implements OnCompletionActions, FatalErrorHandler {
-
-		private final JobID jobId;
-
-		private final CountDownLatch jobMastersToWaitFor;
-
-		private volatile Throwable runnerException;
-
-		private volatile JobResult result;
-
-		BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) {
-			this.jobId = jobId;
-			this.jobMastersToWaitFor = new CountDownLatch(numJobMastersToWaitFor);
-		}
-
-		@Override
-		public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) {
-			this.result = JobResult.createFrom(executionGraph);
-			jobMastersToWaitFor.countDown();
-		}
-
-		@Override
-		public void jobFinishedByOther() {
-			this.jobMastersToWaitFor.countDown();
-		}
-
-		@Override
-		public void onFatalError(Throwable exception) {
-			if (runnerException == null) {
-				runnerException = exception;
-			}
-
-			jobMastersToWaitFor.countDown();
-		}
-
-		public JobExecutionResult getResult() throws JobExecutionException, InterruptedException {
-			jobMastersToWaitFor.await();
-
-			final Throwable runnerException = this.runnerException;
-			final JobResult result = this.result;
-
-			// (1) we check if the job terminated with an exception
-			// (2) we check whether the job completed successfully
-			// (3) we check if we have exceptions from the JobManagers. the job may still have
-			//     completed successfully in that case, if multiple JobMasters were running
-			//     and other took over. only if all encounter a fatal error, the job cannot finish
-
-			if (result != null && !result.isSuccess()) {
-				checkState(result.getSerializedThrowable().isPresent());
-				final Throwable jobFailureCause = result.getSerializedThrowable()
-					.get()
-					.deserializeError(ClassLoader.getSystemClassLoader());
-				if (jobFailureCause instanceof JobExecutionException) {
-					throw (JobExecutionException) jobFailureCause;
-				}
-				else {
-					throw new JobExecutionException(jobId, "The job execution failed", jobFailureCause);
-				}
-			}
-			else if (result != null) {
-				try {
-					return new JobExecutionResult(
-						jobId,
-						result.getNetRuntime(),
-						AccumulatorHelper.deserializeAccumulators(
-							result.getAccumulatorResults(),
-							ClassLoader.getSystemClassLoader()));
-				} catch (final IOException | ClassNotFoundException e) {
-					throw new JobExecutionException(result.getJobId(), e);
-				}
-			}
-			else if (runnerException != null) {
-				throw new JobExecutionException(jobId,
-						"The job execution failed because all JobManagers encountered fatal errors", runnerException);
-			}
-			else {
-				throw new IllegalStateException("Bug: Job finished with neither error nor result.");
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a18f053/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
index a8c402a..0b0cbf5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
@@ -23,14 +23,16 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
@@ -38,6 +40,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.pattern.Patterns;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -55,7 +58,7 @@ import scala.concurrent.duration.FiniteDuration;
  * {@link FlinkMiniCluster}, because the remote environment cannot retrieve the current leader
  * session id.
  */
-public class StandaloneMiniCluster {
+public class StandaloneMiniCluster implements AutoCloseableAsync {
 
 	private static final String LOCAL_HOSTNAME = "localhost";
 
@@ -141,7 +144,8 @@ public class StandaloneMiniCluster {
 		return configuration;
 	}
 
-	public void close() throws Exception {
+	@Override
+	public CompletableFuture<Void> closeAsync() {
 		Exception exception = null;
 
 		try {
@@ -170,7 +174,9 @@ public class StandaloneMiniCluster {
 		}
 
 		if (exception != null) {
-			throw exception;
+			return FutureUtils.completedExceptionally(exception);
+		} else {
+			return CompletableFuture.completedFuture(null);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a18f053/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index 58a2789..26316ec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -89,6 +89,10 @@ public class ResourceManagerRunner implements FatalErrorHandler {
 			this);
 	}
 
+	public ResourceManagerGateway getResourceManageGateway() {
+		return resourceManager.getSelfGateway(ResourceManagerGateway.class);
+	}
+
 	//-------------------------------------------------------------------------------------
 	// Lifecycle management
 	//-------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/2a18f053/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 44e3a67..7948ba1 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -714,7 +714,7 @@ abstract class FlinkMiniCluster(
     submitJobAndWait(jobGraph, false)
   }
 
-  override def terminate() = {
+  override def closeAsync() = {
     try {
       stop()
       CompletableFuture.completedFuture(null)

http://git-wip-us.apache.org/repos/asf/flink/blob/2a18f053/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index 6a4f678..d974998 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.minicluster;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
@@ -28,6 +30,7 @@ import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -39,16 +42,23 @@ import java.io.IOException;
 @Category(Flip6.class)
 public class MiniClusterITCase extends TestLogger {
 
+	private static Configuration configuration;
+
+	@BeforeClass
+	public static void setup() {
+		configuration = new Configuration();
+		configuration.setInteger(WebOptions.PORT, 0);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Simple Job Running Tests
 	// ------------------------------------------------------------------------
 
-	private static final MiniClusterConfiguration defaultConfiguration = null;
-
 	@Test
 	public void runJobWithSingleRpcService() throws Exception {
 		MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
 			.setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.SHARED)
+			.setConfiguration(configuration)
 			.build();
 
 		MiniCluster miniCluster = new MiniCluster(cfg);
@@ -65,6 +75,7 @@ public class MiniClusterITCase extends TestLogger {
 	public void runJobWithMultipleRpcServices() throws Exception {
 		MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
 			.setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.DEDICATED)
+			.setConfiguration(configuration)
 			.build();
 
 		MiniCluster miniCluster = new MiniCluster(cfg);
@@ -77,22 +88,6 @@ public class MiniClusterITCase extends TestLogger {
 		}
 	}
 
-	@Test
-	public void runJobWithMultipleJobManagers() throws Exception {
-		MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
-		.setNumJobManagers(3)
-		.build();
-
-		MiniCluster miniCluster = new MiniCluster(cfg);
-		try {
-			miniCluster.start();
-			executeJob(miniCluster);
-		}
-		finally {
-			miniCluster.shutdown();
-		}
-	}
-
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/2a18f053/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index 54ed05e..dbdc052 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -143,7 +143,7 @@ object FlinkShell {
   ): (String, Int, Option[Either[StandaloneMiniCluster, ClusterClient[_]]]) = {
     config.executionMode match {
       case ExecutionMode.LOCAL => // Local mode
-        val config = GlobalConfiguration.loadConfiguration()
+        val config = configuration
         config.setInteger(JobManagerOptions.PORT, 0)
 
         val miniCluster = new StandaloneMiniCluster(config)
@@ -195,7 +195,7 @@ object FlinkShell {
       val conf = cluster match {
         case Some(Left(miniCluster)) => miniCluster.getConfiguration
         case Some(Right(yarnCluster)) => yarnCluster.getFlinkConfiguration
-        case None => GlobalConfiguration.loadConfiguration()
+        case None => configuration
       }
 
       println(s"\nConnecting to Flink cluster (host: $host, port: $port).\n")

http://git-wip-us.apache.org/repos/asf/flink/blob/2a18f053/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index 6148450..95a2999 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -23,10 +23,12 @@ import java.io._
 import akka.actor.ActorRef
 import akka.pattern.Patterns
 import org.apache.flink.runtime.minicluster.StandaloneMiniCluster
-import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
+import org.apache.flink.configuration.{ConfigConstants, Configuration, CoreOptions, GlobalConfiguration}
+import org.apache.flink.runtime.clusterframework.BootstrapTools
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.util.TestLogger
-import org.junit.{AfterClass, Assert, BeforeClass, Ignore, Test}
+import org.junit.rules.TemporaryFolder
+import org.junit._
 
 import scala.concurrent.{Await, Future}
 import scala.concurrent.duration.FiniteDuration
@@ -36,6 +38,11 @@ class ScalaShellITCase extends TestLogger {
 
   import ScalaShellITCase._
 
+  val _temporaryFolder = new TemporaryFolder
+
+  @Rule
+  def temporaryFolder = _temporaryFolder
+
   /** Prevent re-creation of environment */
   @Test
   def testPreventRecreationBatch(): Unit = {
@@ -269,10 +276,8 @@ class ScalaShellITCase extends TestLogger {
     val oldOut: PrintStream = System.out
     System.setOut(new PrintStream(baos))
 
-    val confFile: String = classOf[ScalaShellLocalStartupITCase]
-      .getResource("/flink-conf.yaml")
-      .getFile
-    val confDir = new File(confFile).getAbsoluteFile.getParent
+    val dir = temporaryFolder.newFolder()
+    BootstrapTools.writeConfiguration(configuration, new File(dir, "flink-conf.yaml"))
 
     val args = cluster match {
       case Some(cl) =>
@@ -281,12 +286,10 @@ class ScalaShellITCase extends TestLogger {
           cl.getHostname,
           Integer.toString(cl.getPort),
           "--configDir",
-          confDir)
+          dir.getAbsolutePath)
       case None => throw new IllegalStateException("Cluster has not been started.")
     }
 
-
-
     //start scala shell with initialized
     // buffered reader for testing
     FlinkShell.bufferedReader = Some(in)
@@ -319,6 +322,7 @@ object ScalaShellITCase {
   @BeforeClass
   def beforeAll(): Unit = {
     configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism)
+    configuration.setString(CoreOptions.MODE, CoreOptions.OLD_MODE)
 
     cluster = Option(new StandaloneMiniCluster(configuration))
   }
@@ -351,14 +355,14 @@ object ScalaShellITCase {
           case Some(ej) => new FlinkILoop(
             cl.getHostname,
             cl.getPort,
-            GlobalConfiguration.loadConfiguration(),
+            configuration,
             Option(Array(ej)),
             in, new PrintWriter(out))
 
           case None => new FlinkILoop(
             cl.getHostname,
             cl.getPort,
-            GlobalConfiguration.loadConfiguration(),
+            configuration,
             in, new PrintWriter(out))
         }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2a18f053/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
index 6f44bfe..34832e7 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
@@ -20,12 +20,19 @@ package org.apache.flink.api.scala
 
 import java.io._
 
+import org.apache.flink.configuration.{Configuration, CoreOptions}
+import org.apache.flink.runtime.clusterframework.BootstrapTools
 import org.apache.flink.util.TestLogger
-import org.junit.Test
-import org.junit.Assert
+import org.junit.{Assert, Rule, Test}
+import org.junit.rules.TemporaryFolder
 
 class ScalaShellLocalStartupITCase extends TestLogger {
 
+  val _temporaryFolder = new TemporaryFolder
+
+  @Rule
+  def temporaryFolder = _temporaryFolder
+
   /**
    * tests flink shell with local setup through startup script in bin folder
    * for both streaming and batch
@@ -77,12 +84,13 @@ class ScalaShellLocalStartupITCase extends TestLogger {
     val oldOut: PrintStream = System.out
     System.setOut(new PrintStream(baos))
 
-    val confFile: String = classOf[ScalaShellLocalStartupITCase]
-      .getResource("/flink-conf.yaml")
-      .getFile
-    val confDir = new File(confFile).getAbsoluteFile.getParent
+    val configuration = new Configuration()
+    configuration.setString(CoreOptions.MODE, CoreOptions.OLD_MODE)
+
+    val dir = temporaryFolder.newFolder()
+    BootstrapTools.writeConfiguration(configuration, new File(dir, "flink-conf.yaml"))
 
-    val args: Array[String] = Array("local", "--configDir", confDir)
+    val args: Array[String] = Array("local", "--configDir", dir.getAbsolutePath)
 
     //start flink scala shell
     FlinkShell.bufferedReader = Some(in);

http://git-wip-us.apache.org/repos/asf/flink/blob/2a18f053/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
index c5952fb..5941ee5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.environment;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -83,6 +84,8 @@ public class Flip6LocalStreamEnvironment extends LocalStreamEnvironment {
 		// add (and override) the settings with what the user defined
 		configuration.addAll(this.conf);
 
+		configuration.setInteger(RestOptions.REST_PORT, 0);
+
 		MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
 			.setConfiguration(configuration)
 			.setNumSlotsPerTaskManager(jobGraph.getMaximumParallelism())
@@ -96,6 +99,8 @@ public class Flip6LocalStreamEnvironment extends LocalStreamEnvironment {
 
 		try {
 			miniCluster.start();
+			configuration.setInteger(RestOptions.REST_PORT, miniCluster.getRestAddress().getPort());
+
 			return miniCluster.executeJobBlocking(jobGraph);
 		}
 		finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/2a18f053/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 69070c6..9bb1ae9 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.util;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.minicluster.JobExecutorService;
 import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -100,7 +101,7 @@ public class MiniClusterResource extends ExternalResource {
 		TestStreamEnvironment.unsetAsContext();
 		TestEnvironment.unsetAsContext();
 
-		final CompletableFuture<?> terminationFuture = jobExecutorService.terminate();
+		final CompletableFuture<?> terminationFuture = jobExecutorService.closeAsync();
 
 		try {
 			terminationFuture.get(
@@ -141,8 +142,13 @@ public class MiniClusterResource extends ExternalResource {
 
 	@Nonnull
 	private JobExecutorService startFlip6MiniCluster() throws Exception {
+		final Configuration configuration = miniClusterResourceConfiguration.getConfiguration();
+
+		// set rest port to 0 to avoid clashes with concurrent MiniClusters
+		configuration.setInteger(RestOptions.REST_PORT, 0);
+
 		final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
-			.setConfiguration(miniClusterResourceConfiguration.getConfiguration())
+			.setConfiguration(configuration)
 			.setNumTaskManagers(miniClusterResourceConfiguration.getNumberTaskManagers())
 			.setNumSlotsPerTaskManager(miniClusterResourceConfiguration.getNumberSlotsPerTaskManager())
 			.build();
@@ -151,6 +157,9 @@ public class MiniClusterResource extends ExternalResource {
 
 		miniCluster.start();
 
+		// update the port of the rest endpoint
+		configuration.setInteger(RestOptions.REST_PORT, miniCluster.getRestAddress().getPort());
+
 		return miniCluster;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2a18f053/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
index 36eded6..aeff578 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
@@ -28,8 +28,13 @@ import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.minicluster.StandaloneMiniCluster;
+import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
@@ -40,10 +45,12 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeTrue;
 
 /**
  * Integration tests for {@link org.apache.flink.api.java.RemoteEnvironment}.
@@ -61,20 +68,46 @@ public class RemoteEnvironmentITCase extends TestLogger {
 
 	private static Configuration configuration;
 
-	private static StandaloneMiniCluster cluster;
+	private static AutoCloseableAsync resource;
+
+	private static String hostname;
+
+	private static int port;
 
 	@BeforeClass
 	public static void setupCluster() throws Exception {
 		configuration = new Configuration();
 
-		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
+		if (CoreOptions.FLIP6_MODE.equals(configuration.getString(CoreOptions.MODE))) {
+			configuration.setInteger(WebOptions.PORT, 0);
+			final MiniCluster miniCluster = new MiniCluster(
+				new MiniClusterConfiguration.Builder()
+					.setConfiguration(configuration)
+					.setNumSlotsPerTaskManager(TM_SLOTS)
+					.build());
+
+			miniCluster.start();
+
+			final URI uri = miniCluster.getRestAddress();
+			hostname = uri.getHost();
+			port = uri.getPort();
 
-		cluster = new StandaloneMiniCluster(configuration);
+			configuration.setInteger(WebOptions.PORT, port);
+
+			resource = miniCluster;
+		} else {
+			configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
+			final StandaloneMiniCluster standaloneMiniCluster = new StandaloneMiniCluster(configuration);
+			hostname = standaloneMiniCluster.getHostname();
+			port = standaloneMiniCluster.getPort();
+
+			resource = standaloneMiniCluster;
+		}
 	}
 
 	@AfterClass
 	public static void tearDownCluster() throws Exception {
-		cluster.close();
+		resource.close();
 	}
 
 	/**
@@ -82,12 +115,13 @@ public class RemoteEnvironmentITCase extends TestLogger {
 	 */
 	@Test(expected = FlinkException.class)
 	public void testInvalidAkkaConfiguration() throws Throwable {
+		assumeTrue(CoreOptions.OLD_MODE.equalsIgnoreCase(configuration.getString(CoreOptions.MODE)));
 		Configuration config = new Configuration();
 		config.setString(AkkaOptions.STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT);
 
 		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				cluster.getHostname(),
-				cluster.getPort(),
+				hostname,
+				port,
 				config
 		);
 		env.getConfig().disableSysoutLogging();
@@ -111,8 +145,8 @@ public class RemoteEnvironmentITCase extends TestLogger {
 		config.setString(AkkaOptions.STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT);
 
 		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				cluster.getHostname(),
-				cluster.getPort(),
+				hostname,
+				port,
 				config
 		);
 		env.setParallelism(USER_DOP);

http://git-wip-us.apache.org/repos/asf/flink/blob/2a18f053/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index 13d6804..f76375d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
@@ -147,12 +148,15 @@ public class ProcessFailureCancelingITCase extends TestLogger {
 
 			final Throwable[] errorRef = new Throwable[1];
 
+			final Configuration configuration = new Configuration();
+			configuration.setString(CoreOptions.MODE, CoreOptions.OLD_MODE);
+
 			// start the test program, which infinitely blocks
 			Runnable programRunner = new Runnable() {
 				@Override
 				public void run() {
 					try {
-						ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort);
+						ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort, configuration);
 						env.setParallelism(2);
 						env.setRestartStrategy(RestartStrategies.noRestart());
 						env.getConfig().disableSysoutLogging();


Mime
View raw message